lib/avro/schema.rb in avro-1.9.2 vs lib/avro/schema.rb in avro-1.10.0

- old
+ new

@@ -27,10 +27,12 @@ PRIMITIVE_TYPES_SYM = Set.new(PRIMITIVE_TYPES.map(&:to_sym)) NAMED_TYPES_SYM = Set.new(NAMED_TYPES.map(&:to_sym)) VALID_TYPES_SYM = Set.new(VALID_TYPES.map(&:to_sym)) + NAME_REGEX = /^([A-Za-z_][A-Za-z0-9_]*)(\.([A-Za-z_][A-Za-z0-9_]*))*$/ + INT_MIN_VALUE = -(1 << 31) INT_MAX_VALUE = (1 << 31) - 1 LONG_MIN_VALUE = -(1 << 63) LONG_MAX_VALUE = (1 << 63) - 1 @@ -51,27 +53,38 @@ raise SchemaParseError, "Unknown type: #{type}" end type_sym = type.to_sym if PRIMITIVE_TYPES_SYM.include?(type_sym) - return PrimitiveSchema.new(type_sym, logical_type) - + case type_sym + when :bytes + precision = json_obj['precision'] + scale = json_obj['scale'] + return BytesSchema.new(type_sym, logical_type, precision, scale) + else + return PrimitiveSchema.new(type_sym, logical_type) + end elsif NAMED_TYPES_SYM.include? type_sym name = json_obj['name'] + if !Avro.disable_schema_name_validation && name !~ NAME_REGEX + raise SchemaParseError, "Name #{name} is invalid for type #{type}!" + end namespace = json_obj.include?('namespace') ? json_obj['namespace'] : default_namespace + aliases = json_obj['aliases'] case type_sym when :fixed size = json_obj['size'] - return FixedSchema.new(name, namespace, size, names, logical_type) + return FixedSchema.new(name, namespace, size, names, logical_type, aliases) when :enum symbols = json_obj['symbols'] doc = json_obj['doc'] - return EnumSchema.new(name, namespace, symbols, names, doc) + default = json_obj['default'] + return EnumSchema.new(name, namespace, symbols, names, doc, default, aliases) when :record, :error fields = json_obj['fields'] doc = json_obj['doc'] - return RecordSchema.new(name, namespace, fields, names, type_sym, doc) + return RecordSchema.new(name, namespace, fields, names, type_sym, doc, aliases) else raise SchemaParseError.new("Unknown named type: #{type}") end else @@ -129,10 +142,53 @@ def sha256_fingerprint parsing_form = SchemaNormalization.to_parsing_form(self) Digest::SHA256.hexdigest(parsing_form).to_i(16) end + CRC_EMPTY = 0xc15d213aa4d7a795 + + # The java library caches this value after initialized, so this pattern + # mimics that. + @@fp_table = nil + def initFPTable + @@fp_table = Array.new(256) + 256.times do |i| + fp = i + 8.times do + fp = (fp >> 1) ^ ( CRC_EMPTY & -( fp & 1 ) ) + end + @@fp_table[i] = fp + end + end + + def crc_64_avro_fingerprint + parsing_form = Avro::SchemaNormalization.to_parsing_form(self) + data_bytes = parsing_form.unpack("C*") + + initFPTable unless @@fp_table + + fp = CRC_EMPTY + data_bytes.each do |b| + fp = (fp >> 8) ^ @@fp_table[ (fp ^ b) & 0xff ] + end + fp + end + + SINGLE_OBJECT_MAGIC_NUMBER = [0xC3, 0x01] + def single_object_encoding_header + [SINGLE_OBJECT_MAGIC_NUMBER, single_object_schema_fingerprint].flatten + end + def single_object_schema_fingerprint + working = crc_64_avro_fingerprint + bytes = Array.new(8) + 8.times do |i| + bytes[7 - i] = (working & 0xff) + working = working >> 8 + end + bytes + end + def read?(writers_schema) SchemaCompatibility.can_read?(writers_schema, self) end def be_read?(other_schema) @@ -141,15 +197,15 @@ def mutual_read?(other_schema) SchemaCompatibility.mutual_read?(other_schema, self) end - def ==(other, seen=nil) + def ==(other, _seen=nil) other.is_a?(Schema) && type_sym == other.type_sym end - def hash(seen=nil) + def hash(_seen=nil) type_sym.hash end def subparse(json_obj, names=nil, namespace=nil) if json_obj.is_a?(String) && names @@ -163,80 +219,112 @@ raise e if e.is_a? SchemaParseError raise SchemaParseError, "Sub-schema for #{self.class.name} not a valid Avro schema. Bad schema: #{json_obj}" end end - def to_avro(names=nil) + def to_avro(_names=nil) props = {'type' => type} props['logicalType'] = logical_type if logical_type props end def to_s MultiJson.dump to_avro end + def validate_aliases! + unless aliases.nil? || + (aliases.is_a?(Array) && aliases.all? { |a| a.is_a?(String) }) + + raise Avro::SchemaParseError, + "Invalid aliases value #{aliases.inspect} for #{type} #{name}. Must be an array of strings." + end + end + private :validate_aliases! + class NamedSchema < Schema - attr_reader :name, :namespace + attr_reader :name, :namespace, :aliases - def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil) + def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil, aliases=nil) super(type, logical_type) @name, @namespace = Name.extract_namespace(name, namespace) - @doc = doc - names = Name.add_name(names, self) + @doc = doc + @aliases = aliases + validate_aliases! if aliases + Name.add_name(names, self) end def to_avro(names=Set.new) if @name return fullname if names.include?(fullname) names << fullname end props = {'name' => @name} props.merge!('namespace' => @namespace) if @namespace - props.merge!('doc' => @doc) if @doc + props['namespace'] = @namespace if @namespace + props['doc'] = @doc if @doc + props['aliases'] = aliases if aliases && aliases.any? super.merge props end def fullname @fullname ||= Name.make_fullname(@name, @namespace) end + + def fullname_aliases + @fullname_aliases ||= if aliases + aliases.map { |a| Name.make_fullname(a, namespace) } + else + [] + end + end + + def match_fullname?(name) + name == fullname || fullname_aliases.include?(name) + end end class RecordSchema < NamedSchema attr_reader :fields, :doc def self.make_field_objects(field_data, names, namespace=nil) - field_objects, field_names = [], Set.new - field_data.each_with_index do |field, i| + field_objects, field_names, alias_names = [], Set.new, Set.new + field_data.each do |field| if field.respond_to?(:[]) # TODO(jmhodges) wtffffff type = field['type'] name = field['name'] default = field.key?('default') ? field['default'] : :no_default order = field['order'] doc = field['doc'] - new_field = Field.new(type, name, default, order, names, namespace, doc) + aliases = field['aliases'] + new_field = Field.new(type, name, default, order, names, namespace, doc, aliases) # make sure field name has not been used yet if field_names.include?(new_field.name) raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use" end field_names << new_field.name + # make sure alias has not be been used yet + if new_field.aliases && alias_names.intersect?(new_field.aliases.to_set) + raise SchemaParseError, "Alias #{(alias_names & new_field.aliases).to_a} already in use" + end + alias_names.merge(new_field.aliases) if new_field.aliases else raise SchemaParseError, "Not a valid field: #{field}" end field_objects << new_field end field_objects end - def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil) + def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil, aliases=nil) if schema_type == :request || schema_type == 'request' @type_sym = schema_type.to_sym @namespace = namespace @name = nil @doc = nil else - super(schema_type, name, namespace, names, doc) + super(schema_type, name, namespace, names, doc, nil, aliases) end @fields = if fields RecordSchema.make_field_objects(fields, names, self.namespace) else {} @@ -245,10 +333,20 @@ def fields_hash @fields_hash ||= fields.inject({}){|hsh, field| hsh[field.name] = field; hsh } end + def fields_by_alias + @fields_by_alias ||= fields.each_with_object({}) do |field, hash| + if field.aliases + field.aliases.each do |a| + hash[a] = field + end + end + end + end + def to_avro(names=Set.new) hsh = super return hsh unless hsh.is_a?(Hash) hsh['fields'] = @fields.map {|f| f.to_avro(names) } if type_sym == :request @@ -311,24 +409,45 @@ schemas.map {|schema| schema.to_avro(names) } end end class EnumSchema < NamedSchema - attr_reader :symbols, :doc + SYMBOL_REGEX = /^[A-Za-z_][A-Za-z0-9_]*$/ - def initialize(name, space, symbols, names=nil, doc=nil) + attr_reader :symbols, :doc, :default + + def initialize(name, space, symbols, names=nil, doc=nil, default=nil, aliases=nil) if symbols.uniq.length < symbols.length fail_msg = "Duplicate symbol: #{symbols}" raise Avro::SchemaParseError, fail_msg end - super(:enum, name, space, names, doc) + + if !Avro.disable_enum_symbol_validation + invalid_symbols = symbols.select { |symbol| symbol !~ SYMBOL_REGEX } + + if invalid_symbols.any? + raise SchemaParseError, + "Invalid symbols for #{name}: #{invalid_symbols.join(', ')} don't match #{SYMBOL_REGEX.inspect}" + end + end + + if default && !symbols.include?(default) + raise Avro::SchemaParseError, "Default '#{default}' is not a valid symbol for enum #{name}" + end + + super(:enum, name, space, names, doc, nil, aliases) + @default = default @symbols = symbols end - def to_avro(names=Set.new) + def to_avro(_names=Set.new) avro = super - avro.is_a?(Hash) ? avro.merge('symbols' => symbols) : avro + if avro.is_a?(Hash) + avro['symbols'] = symbols + avro['default'] = default if default + end + avro end end # Valid primitive types are in PRIMITIVE_TYPES. class PrimitiveSchema < Schema @@ -346,36 +465,56 @@ hsh = super hsh.size == 1 ? type : hsh end end + class BytesSchema < PrimitiveSchema + attr_reader :precision, :scale + def initialize(type, logical_type=nil, precision=nil, scale=nil) + super(type.to_sym, logical_type) + @precision = precision + @scale = scale + end + + def to_avro(names=nil) + avro = super + return avro if avro.is_a?(String) + + avro['precision'] = precision if precision + avro['scale'] = scale if scale + avro + end + end + class FixedSchema < NamedSchema attr_reader :size - def initialize(name, space, size, names=nil, logical_type=nil) + def initialize(name, space, size, names=nil, logical_type=nil, aliases=nil) # Ensure valid cto args unless size.is_a?(Integer) raise AvroError, 'Fixed Schema requires a valid integer for size property.' end - super(:fixed, name, space, names, nil, logical_type) + super(:fixed, name, space, names, nil, logical_type, aliases) @size = size end def to_avro(names=Set.new) avro = super avro.is_a?(Hash) ? avro.merge('size' => size) : avro end end class Field < Schema - attr_reader :type, :name, :default, :order, :doc + attr_reader :type, :name, :default, :order, :doc, :aliases - def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil) + def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil, aliases=nil) @type = subparse(type, names, namespace) @name = name @default = default @order = order @doc = doc + @aliases = aliases + validate_aliases! if aliases validate_default! if default? && !Avro.disable_field_default_validation end def default? @default != :no_default @@ -385,9 +524,13 @@ {'name' => name, 'type' => type.to_avro(names)}.tap do |avro| avro['default'] = default if default? avro['order'] = order if order avro['doc'] = doc if doc end + end + + def alias_names + @alias_names ||= Array(aliases) end private def validate_default!