lib/embulk/output/bigquery/value_converter_factory.rb in embulk-output-bigquery-0.4.2 vs lib/embulk/output/bigquery/value_converter_factory.rb in embulk-output-bigquery-0.4.3
- old
+ new
@@ -1,7 +1,7 @@
require 'time'
-require 'tzinfo'
+require 'time_with_zone'
require 'json'
require_relative 'helper'
module Embulk
module Output
@@ -21,12 +21,12 @@
# @option task [Hash] column_options user defined column types
# @param [Schema] schema embulk defined column types
# @return [Array] an arary whose key is column_index, and value is its converter (Proc)
def self.create_converters(task, schema)
column_options_map = Helper.column_options_map(task['column_options'])
- default_timestamp_format = task['default_timestamp_format']
- default_timezone = task['default_timezone']
+ default_timestamp_format = task['default_timestamp_format'] || DEFAULT_TIMESTAMP_FORMAT
+ default_timezone = task['default_timezone'] || DEFAULT_TIMEZONE
schema.map do |column|
column_name = column[:name]
embulk_type = column[:type]
column_option = column_options_map[column_name] || {}
self.new(
@@ -51,11 +51,11 @@
@embulk_type = embulk_type
@type = (type || Helper.bq_type_from_embulk_type(embulk_type)).upcase
@timestamp_format = timestamp_format
@default_timestamp_format = default_timestamp_format
@timezone = timezone || default_timezone
- @zone_offset = get_zone_offset(@timezone) if @timezone
+ @zone_offset = TimeWithZone.zone_offset(@timezone)
@strict = strict.nil? ? true : strict
end
def create_converter
case embulk_type
@@ -192,11 +192,11 @@
when 'TIMESTAMP'
if @timestamp_format
Proc.new {|val|
next nil if val.nil?
with_typecast_error(val) do |val|
- strptime_with_zone(val, @timestamp_format, zone_offset).to_f
+ TimeWithZone.set_zone_offset(Time.strptime(val, @timestamp_format), zone_offset).strftime("%Y-%m-%d %H:%M:%S.%6N %:z")
end
}
else
Proc.new {|val|
next nil if val.nil?
@@ -236,11 +236,11 @@
end
}
when 'TIMESTAMP'
Proc.new {|val|
next nil if val.nil?
- val.to_f # BigQuery supports UNIX timestamp
+ val.strftime("%Y-%m-%d %H:%M:%S.%6N %:z")
}
else
raise NotSupportedType, "cannot take column type #{type} for timestamp column"
end
end
@@ -257,34 +257,9 @@
Proc.new {|val|
val
}
else
raise NotSupportedType, "cannot take column type #{type} for json column"
- end
- end
-
- private
-
- # [+-]HH:MM, [+-]HHMM, [+-]HH
- NUMERIC_PATTERN = %r{\A[+-]\d\d(:?\d\d)?\z}
-
- # Region/Zone, Region/Zone/Zone
- NAME_PATTERN = %r{\A[^/]+/[^/]+(/[^/]+)?\z}
-
- def strptime_with_zone(date, timestamp_format, zone_offset)
- time = Time.strptime(date, timestamp_format)
- utc_offset = time.utc_offset
- time.localtime(zone_offset) + utc_offset - zone_offset
- end
-
- def get_zone_offset(timezone)
- if NUMERIC_PATTERN === timezone
- Time.zone_offset(timezone)
- elsif NAME_PATTERN === timezone || 'UTC' == timezone
- tz = TZInfo::Timezone.get(timezone)
- tz.period_for_utc(Time.now).utc_total_offset
- else
- raise ArgumentError, "timezone format is invalid: #{timezone}"
end
end
end
end
end