Sha256: 8702c231125e23817da8eae2126a20a61e14ba0c0108ce79116156017ec62fa2

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

require 'avro_turf'
require 'avro_turf/confluent_schema_registry'
require 'avro-resolution_canonical_form'

module AvroSchemaRegistry
  class Client < AvroTurf::ConfluentSchemaRegistry

    def lookup_subject_schema(subject, schema)
      schema_object = if schema.is_a?(String)
                        Avro::Schema.parse(schema)
                      else
                        schema
                      end

      data = get("/subjects/#{subject}/fingerprints/#{schema_object.sha256_resolution_fingerprint.to_s(16)}")
      id = data.fetch('id')
      @logger.info("Found schema for subject `#{subject}`; id = #{id}")
      id
    end

    # Override register to first check if a schema is registered by fingerprint
    # Also, allow additional params to be passed to register.
    def register(subject, schema, **params)

      lookup_subject_schema(subject, schema)
    rescue Excon::Errors::NotFound
      data = post("/subjects/#{subject}/versions",
                  body: { schema: schema.to_s }.merge!(params).to_json)
      id = data.fetch('id')
      @logger.info("Registered schema for subject `#{subject}`; id = #{id}")
      id

    end

    # Override to add support for additional params
    def compatible?(subject, schema, version = 'latest', **params)
      data = post("/compatibility/subjects/#{subject}/versions/#{version}",
                  expects: [200, 404],
                  body: { schema: schema.to_s }.merge!(params).to_json)
      data.fetch('is_compatible', false) unless data.key?('error_code')
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
avro_schema_registry-client-0.1.0 lib/avro_schema_registry/client.rb