lib/fluent/plugin/bigquery/schema.rb in fluent-plugin-bigquery-3.0.1 vs lib/fluent/plugin/bigquery/schema.rb in fluent-plugin-bigquery-3.1.0
- old
+ new
@@ -21,27 +21,27 @@
@mode = mode
end
attr_reader :name, :mode
- def format(value)
+ def format(value, is_load: false)
case @mode
when :nullable
- format_one(value) unless value.nil?
+ format_one(value, is_load: is_load) unless value.nil?
when :required
if value.nil?
log.warn "Required field #{name} cannot be null"
nil
else
- format_one(value)
+ format_one(value, is_load: is_load)
end
when :repeated
- value.nil? ? [] : value.each_with_object([]) { |v, arr| arr << format_one(v) if v }
+ value.nil? ? [] : value.each_with_object([]) { |v, arr| arr << format_one(v, is_load: true) if v }
end
end
- def format_one(value)
+ def format_one(value, is_load: false)
raise NotImplementedError, "Must implement in a subclass"
end
def to_h
{
@@ -55,55 +55,75 @@
class StringFieldSchema < FieldSchema
def type
:string
end
- def format_one(value)
+ def format_one(value, is_load: false)
if value.is_a?(Hash) || value.is_a?(Array)
MultiJson.dump(value)
else
value.to_s
end
end
end
+ class JsonFieldSchema < FieldSchema
+ def type
+ :json
+ end
+
+ def format_one(value, is_load: false)
+ if is_load
+ value
+ else
+ MultiJson.dump(value)
+ end
+ end
+ end
+
+ class GeographyFieldSchema < StringFieldSchema
+ def type
+ :geography
+ end
+ end
+
class IntegerFieldSchema < FieldSchema
def type
:integer
end
- def format_one(value)
+ def format_one(value, is_load: false)
value.to_i
end
end
class FloatFieldSchema < FieldSchema
def type
:float
end
- def format_one(value)
+ def format_one(value, is_load: false)
value.to_f
end
end
class NumericFieldSchema < FieldSchema
def type
:numeric
end
- def format_one(value)
+ def format_one(value, is_load: false)
value.to_s
end
end
class BooleanFieldSchema < FieldSchema
def type
:boolean
end
- def format_one(value)
+ def format_one(value, is_load: false)
!!value
end
end
class TimestampFieldSchema < FieldSchema
@@ -112,11 +132,11 @@
def type
:timestamp
end
- def format_one(value)
+ def format_one(value, is_load: false)
case value
when Time
value.strftime("%Y-%m-%d %H:%M:%S.%6L %:z")
when String
if value =~ INTEGER_REGEXP
@@ -135,11 +155,11 @@
class DateFieldSchema < FieldSchema
def type
:date
end
- def format_one(value)
+ def format_one(value, is_load: false)
if value.respond_to?(:strftime)
value.strftime("%Y-%m-%d")
else
value
end
@@ -149,11 +169,11 @@
class DateTimeFieldSchema < FieldSchema
def type
:datetime
end
- def format_one(value)
+ def format_one(value, is_load: false)
if value.respond_to?(:strftime)
value.strftime("%Y-%m-%dT%H:%M:%S.%6L")
else
value
end
@@ -163,11 +183,11 @@
class TimeFieldSchema < FieldSchema
def type
:time
end
- def format_one(value)
+ def format_one(value, is_load: false)
if value.respond_to?(:strftime)
value.strftime("%H:%M:%S.%6L")
else
value
end
@@ -183,10 +203,12 @@
boolean: BooleanFieldSchema,
timestamp: TimestampFieldSchema,
date: DateFieldSchema,
datetime: DateTimeFieldSchema,
time: TimeFieldSchema,
+ json: JsonFieldSchema,
+ geography: GeographyFieldSchema,
record: RecordSchema
}.freeze
def initialize(name, mode = :nullable)
super(name, mode)
@@ -254,15 +276,15 @@
raise ConfigError, "[Bug] Invalid field type #{type}" unless schema
@fields[name] = schema.new(name)
end
end
- def format_one(record)
+ def format_one(record, is_load: false)
out = {}
record.each do |key, value|
next if value.nil?
schema = @fields[key]
- out[key] = schema ? schema.format(value) : value
+ out[key] = schema ? schema.format(value, is_load: is_load) : value
end
out
end
private