lib/fluent/plugin/bigquery/schema.rb in fluent-plugin-bigquery-3.0.1 vs lib/fluent/plugin/bigquery/schema.rb in fluent-plugin-bigquery-3.1.0

- old
+ new

@@ -21,27 +21,27 @@ @mode = mode end attr_reader :name, :mode - def format(value) + def format(value, is_load: false) case @mode when :nullable - format_one(value) unless value.nil? + format_one(value, is_load: is_load) unless value.nil? when :required if value.nil? log.warn "Required field #{name} cannot be null" nil else - format_one(value) + format_one(value, is_load: is_load) end when :repeated - value.nil? ? [] : value.each_with_object([]) { |v, arr| arr << format_one(v) if v } + value.nil? ? [] : value.each_with_object([]) { |v, arr| arr << format_one(v, is_load: true) if v } end end - def format_one(value) + def format_one(value, is_load: false) raise NotImplementedError, "Must implement in a subclass" end def to_h { @@ -55,55 +55,75 @@ class StringFieldSchema < FieldSchema def type :string end - def format_one(value) + def format_one(value, is_load: false) if value.is_a?(Hash) || value.is_a?(Array) MultiJson.dump(value) else value.to_s end end end + class JsonFieldSchema < FieldSchema + def type + :json + end + + def format_one(value, is_load: false) + if is_load + value + else + MultiJson.dump(value) + end + end + end + + class GeographyFieldSchema < StringFieldSchema + def type + :geography + end + end + class IntegerFieldSchema < FieldSchema def type :integer end - def format_one(value) + def format_one(value, is_load: false) value.to_i end end class FloatFieldSchema < FieldSchema def type :float end - def format_one(value) + def format_one(value, is_load: false) value.to_f end end class NumericFieldSchema < FieldSchema def type :numeric end - def format_one(value) + def format_one(value, is_load: false) value.to_s end end class BooleanFieldSchema < FieldSchema def type :boolean end - def format_one(value) + def format_one(value, is_load: false) !!value end end class TimestampFieldSchema < FieldSchema @@ -112,11 +132,11 @@ def type :timestamp end - def format_one(value) + def format_one(value, is_load: false) case value when Time value.strftime("%Y-%m-%d %H:%M:%S.%6L %:z") when String if value =~ INTEGER_REGEXP @@ -135,11 +155,11 @@ class DateFieldSchema < FieldSchema def type :date end - def format_one(value) + def format_one(value, is_load: false) if value.respond_to?(:strftime) value.strftime("%Y-%m-%d") else value end @@ -149,11 +169,11 @@ class DateTimeFieldSchema < FieldSchema def type :datetime end - def format_one(value) + def format_one(value, is_load: false) if value.respond_to?(:strftime) value.strftime("%Y-%m-%dT%H:%M:%S.%6L") else value end @@ -163,11 +183,11 @@ class TimeFieldSchema < FieldSchema def type :time end - def format_one(value) + def format_one(value, is_load: false) if value.respond_to?(:strftime) value.strftime("%H:%M:%S.%6L") else value end @@ -183,10 +203,12 @@ boolean: BooleanFieldSchema, timestamp: TimestampFieldSchema, date: DateFieldSchema, datetime: DateTimeFieldSchema, time: TimeFieldSchema, + json: JsonFieldSchema, + geography: GeographyFieldSchema, record: RecordSchema }.freeze def initialize(name, mode = :nullable) super(name, mode) @@ -254,15 +276,15 @@ raise ConfigError, "[Bug] Invalid field type #{type}" unless schema @fields[name] = schema.new(name) end end - def format_one(record) + def format_one(record, is_load: false) out = {} record.each do |key, value| next if value.nil? schema = @fields[key] - out[key] = schema ? schema.format(value) : value + out[key] = schema ? schema.format(value, is_load: is_load) : value end out end private