lib/avro/schema.rb in avro-1.9.2 vs lib/avro/schema.rb in avro-1.10.0
- old
+ new
@@ -27,10 +27,12 @@
PRIMITIVE_TYPES_SYM = Set.new(PRIMITIVE_TYPES.map(&:to_sym))
NAMED_TYPES_SYM = Set.new(NAMED_TYPES.map(&:to_sym))
VALID_TYPES_SYM = Set.new(VALID_TYPES.map(&:to_sym))
+ NAME_REGEX = /^([A-Za-z_][A-Za-z0-9_]*)(\.([A-Za-z_][A-Za-z0-9_]*))*$/
+
INT_MIN_VALUE = -(1 << 31)
INT_MAX_VALUE = (1 << 31) - 1
LONG_MIN_VALUE = -(1 << 63)
LONG_MAX_VALUE = (1 << 63) - 1
@@ -51,27 +53,38 @@
raise SchemaParseError, "Unknown type: #{type}"
end
type_sym = type.to_sym
if PRIMITIVE_TYPES_SYM.include?(type_sym)
- return PrimitiveSchema.new(type_sym, logical_type)
-
+ case type_sym
+ when :bytes
+ precision = json_obj['precision']
+ scale = json_obj['scale']
+ return BytesSchema.new(type_sym, logical_type, precision, scale)
+ else
+ return PrimitiveSchema.new(type_sym, logical_type)
+ end
elsif NAMED_TYPES_SYM.include? type_sym
name = json_obj['name']
+ if !Avro.disable_schema_name_validation && name !~ NAME_REGEX
+ raise SchemaParseError, "Name #{name} is invalid for type #{type}!"
+ end
namespace = json_obj.include?('namespace') ? json_obj['namespace'] : default_namespace
+ aliases = json_obj['aliases']
case type_sym
when :fixed
size = json_obj['size']
- return FixedSchema.new(name, namespace, size, names, logical_type)
+ return FixedSchema.new(name, namespace, size, names, logical_type, aliases)
when :enum
symbols = json_obj['symbols']
doc = json_obj['doc']
- return EnumSchema.new(name, namespace, symbols, names, doc)
+ default = json_obj['default']
+ return EnumSchema.new(name, namespace, symbols, names, doc, default, aliases)
when :record, :error
fields = json_obj['fields']
doc = json_obj['doc']
- return RecordSchema.new(name, namespace, fields, names, type_sym, doc)
+ return RecordSchema.new(name, namespace, fields, names, type_sym, doc, aliases)
else
raise SchemaParseError.new("Unknown named type: #{type}")
end
else
@@ -129,10 +142,53 @@
def sha256_fingerprint
parsing_form = SchemaNormalization.to_parsing_form(self)
Digest::SHA256.hexdigest(parsing_form).to_i(16)
end
+ CRC_EMPTY = 0xc15d213aa4d7a795
+
+ # The java library caches this value after initialized, so this pattern
+ # mimics that.
+ @@fp_table = nil
+ def initFPTable
+ @@fp_table = Array.new(256)
+ 256.times do |i|
+ fp = i
+ 8.times do
+ fp = (fp >> 1) ^ ( CRC_EMPTY & -( fp & 1 ) )
+ end
+ @@fp_table[i] = fp
+ end
+ end
+
+ def crc_64_avro_fingerprint
+ parsing_form = Avro::SchemaNormalization.to_parsing_form(self)
+ data_bytes = parsing_form.unpack("C*")
+
+ initFPTable unless @@fp_table
+
+ fp = CRC_EMPTY
+ data_bytes.each do |b|
+ fp = (fp >> 8) ^ @@fp_table[ (fp ^ b) & 0xff ]
+ end
+ fp
+ end
+
+ SINGLE_OBJECT_MAGIC_NUMBER = [0xC3, 0x01]
+ def single_object_encoding_header
+ [SINGLE_OBJECT_MAGIC_NUMBER, single_object_schema_fingerprint].flatten
+ end
+ def single_object_schema_fingerprint
+ working = crc_64_avro_fingerprint
+ bytes = Array.new(8)
+ 8.times do |i|
+ bytes[7 - i] = (working & 0xff)
+ working = working >> 8
+ end
+ bytes
+ end
+
def read?(writers_schema)
SchemaCompatibility.can_read?(writers_schema, self)
end
def be_read?(other_schema)
@@ -141,15 +197,15 @@
def mutual_read?(other_schema)
SchemaCompatibility.mutual_read?(other_schema, self)
end
- def ==(other, seen=nil)
+ def ==(other, _seen=nil)
other.is_a?(Schema) && type_sym == other.type_sym
end
- def hash(seen=nil)
+ def hash(_seen=nil)
type_sym.hash
end
def subparse(json_obj, names=nil, namespace=nil)
if json_obj.is_a?(String) && names
@@ -163,80 +219,112 @@
raise e if e.is_a? SchemaParseError
raise SchemaParseError, "Sub-schema for #{self.class.name} not a valid Avro schema. Bad schema: #{json_obj}"
end
end
- def to_avro(names=nil)
+ def to_avro(_names=nil)
props = {'type' => type}
props['logicalType'] = logical_type if logical_type
props
end
def to_s
MultiJson.dump to_avro
end
+ def validate_aliases!
+ unless aliases.nil? ||
+ (aliases.is_a?(Array) && aliases.all? { |a| a.is_a?(String) })
+
+ raise Avro::SchemaParseError,
+ "Invalid aliases value #{aliases.inspect} for #{type} #{name}. Must be an array of strings."
+ end
+ end
+ private :validate_aliases!
+
class NamedSchema < Schema
- attr_reader :name, :namespace
+ attr_reader :name, :namespace, :aliases
- def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil)
+ def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil, aliases=nil)
super(type, logical_type)
@name, @namespace = Name.extract_namespace(name, namespace)
- @doc = doc
- names = Name.add_name(names, self)
+ @doc = doc
+ @aliases = aliases
+ validate_aliases! if aliases
+ Name.add_name(names, self)
end
def to_avro(names=Set.new)
if @name
return fullname if names.include?(fullname)
names << fullname
end
props = {'name' => @name}
props.merge!('namespace' => @namespace) if @namespace
- props.merge!('doc' => @doc) if @doc
+ props['namespace'] = @namespace if @namespace
+ props['doc'] = @doc if @doc
+ props['aliases'] = aliases if aliases && aliases.any?
super.merge props
end
def fullname
@fullname ||= Name.make_fullname(@name, @namespace)
end
+
+ def fullname_aliases
+ @fullname_aliases ||= if aliases
+ aliases.map { |a| Name.make_fullname(a, namespace) }
+ else
+ []
+ end
+ end
+
+ def match_fullname?(name)
+ name == fullname || fullname_aliases.include?(name)
+ end
end
class RecordSchema < NamedSchema
attr_reader :fields, :doc
def self.make_field_objects(field_data, names, namespace=nil)
- field_objects, field_names = [], Set.new
- field_data.each_with_index do |field, i|
+ field_objects, field_names, alias_names = [], Set.new, Set.new
+ field_data.each do |field|
if field.respond_to?(:[]) # TODO(jmhodges) wtffffff
type = field['type']
name = field['name']
default = field.key?('default') ? field['default'] : :no_default
order = field['order']
doc = field['doc']
- new_field = Field.new(type, name, default, order, names, namespace, doc)
+ aliases = field['aliases']
+ new_field = Field.new(type, name, default, order, names, namespace, doc, aliases)
# make sure field name has not been used yet
if field_names.include?(new_field.name)
raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use"
end
field_names << new_field.name
+ # make sure alias has not be been used yet
+ if new_field.aliases && alias_names.intersect?(new_field.aliases.to_set)
+ raise SchemaParseError, "Alias #{(alias_names & new_field.aliases).to_a} already in use"
+ end
+ alias_names.merge(new_field.aliases) if new_field.aliases
else
raise SchemaParseError, "Not a valid field: #{field}"
end
field_objects << new_field
end
field_objects
end
- def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil)
+ def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil, aliases=nil)
if schema_type == :request || schema_type == 'request'
@type_sym = schema_type.to_sym
@namespace = namespace
@name = nil
@doc = nil
else
- super(schema_type, name, namespace, names, doc)
+ super(schema_type, name, namespace, names, doc, nil, aliases)
end
@fields = if fields
RecordSchema.make_field_objects(fields, names, self.namespace)
else
{}
@@ -245,10 +333,20 @@
def fields_hash
@fields_hash ||= fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
end
+ def fields_by_alias
+ @fields_by_alias ||= fields.each_with_object({}) do |field, hash|
+ if field.aliases
+ field.aliases.each do |a|
+ hash[a] = field
+ end
+ end
+ end
+ end
+
def to_avro(names=Set.new)
hsh = super
return hsh unless hsh.is_a?(Hash)
hsh['fields'] = @fields.map {|f| f.to_avro(names) }
if type_sym == :request
@@ -311,24 +409,45 @@
schemas.map {|schema| schema.to_avro(names) }
end
end
class EnumSchema < NamedSchema
- attr_reader :symbols, :doc
+ SYMBOL_REGEX = /^[A-Za-z_][A-Za-z0-9_]*$/
- def initialize(name, space, symbols, names=nil, doc=nil)
+ attr_reader :symbols, :doc, :default
+
+ def initialize(name, space, symbols, names=nil, doc=nil, default=nil, aliases=nil)
if symbols.uniq.length < symbols.length
fail_msg = "Duplicate symbol: #{symbols}"
raise Avro::SchemaParseError, fail_msg
end
- super(:enum, name, space, names, doc)
+
+ if !Avro.disable_enum_symbol_validation
+ invalid_symbols = symbols.select { |symbol| symbol !~ SYMBOL_REGEX }
+
+ if invalid_symbols.any?
+ raise SchemaParseError,
+ "Invalid symbols for #{name}: #{invalid_symbols.join(', ')} don't match #{SYMBOL_REGEX.inspect}"
+ end
+ end
+
+ if default && !symbols.include?(default)
+ raise Avro::SchemaParseError, "Default '#{default}' is not a valid symbol for enum #{name}"
+ end
+
+ super(:enum, name, space, names, doc, nil, aliases)
+ @default = default
@symbols = symbols
end
- def to_avro(names=Set.new)
+ def to_avro(_names=Set.new)
avro = super
- avro.is_a?(Hash) ? avro.merge('symbols' => symbols) : avro
+ if avro.is_a?(Hash)
+ avro['symbols'] = symbols
+ avro['default'] = default if default
+ end
+ avro
end
end
# Valid primitive types are in PRIMITIVE_TYPES.
class PrimitiveSchema < Schema
@@ -346,36 +465,56 @@
hsh = super
hsh.size == 1 ? type : hsh
end
end
+ class BytesSchema < PrimitiveSchema
+ attr_reader :precision, :scale
+ def initialize(type, logical_type=nil, precision=nil, scale=nil)
+ super(type.to_sym, logical_type)
+ @precision = precision
+ @scale = scale
+ end
+
+ def to_avro(names=nil)
+ avro = super
+ return avro if avro.is_a?(String)
+
+ avro['precision'] = precision if precision
+ avro['scale'] = scale if scale
+ avro
+ end
+ end
+
class FixedSchema < NamedSchema
attr_reader :size
- def initialize(name, space, size, names=nil, logical_type=nil)
+ def initialize(name, space, size, names=nil, logical_type=nil, aliases=nil)
# Ensure valid cto args
unless size.is_a?(Integer)
raise AvroError, 'Fixed Schema requires a valid integer for size property.'
end
- super(:fixed, name, space, names, nil, logical_type)
+ super(:fixed, name, space, names, nil, logical_type, aliases)
@size = size
end
def to_avro(names=Set.new)
avro = super
avro.is_a?(Hash) ? avro.merge('size' => size) : avro
end
end
class Field < Schema
- attr_reader :type, :name, :default, :order, :doc
+ attr_reader :type, :name, :default, :order, :doc, :aliases
- def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil)
+ def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil, aliases=nil)
@type = subparse(type, names, namespace)
@name = name
@default = default
@order = order
@doc = doc
+ @aliases = aliases
+ validate_aliases! if aliases
validate_default! if default? && !Avro.disable_field_default_validation
end
def default?
@default != :no_default
@@ -385,9 +524,13 @@
{'name' => name, 'type' => type.to_avro(names)}.tap do |avro|
avro['default'] = default if default?
avro['order'] = order if order
avro['doc'] = doc if doc
end
+ end
+
+ def alias_names
+ @alias_names ||= Array(aliases)
end
private
def validate_default!