Sha256: ed414b23e639ec5e38cfcfdf384f46c4e57c02c75e5eec9e2b12224e3e30c22a

Contents?: true

Size: 1.63 KB

Versions: 2

Compression:

Stored size: 1.63 KB

Contents

# frozen_string_literal: true

require 'avro_schema_registry-client'
require 'private_attr'
require 'procto'

module Avrolution
  class RegisterSchemas
    extend PrivateAttr
    include Procto.call

    attr_reader :schema_files

    private_attr_reader :compatibility_breaks, :schema_registry

    class IncompatibleSchemaError < StandardError
      def initialize(name)
        super("incompatible schema #{name}")
      end
    end

    def initialize(schema_files)
      @schema_files = Array(schema_files)
      @compatibility_breaks = Avrolution::CompatibilityBreaksFile.load
      @schema_registry = build_schema_registry
    end

    def call
      schemas.each do |(json, schema)|
        register_schema(schema, json)
      end
    end

    private

    def register_schema(schema, json)
      fullname = schema.fullname
      fingerprint = schema.sha256_resolution_fingerprint.to_s(16)

      compatibility_break = compatibility_breaks[[fullname, fingerprint]]

      begin
        schema_registry.register_without_lookup(
          fullname,
          json,
          compatibility_break.try(:register_options) || {}
        )
      rescue Excon::Error::Conflict
        raise IncompatibleSchemaError.new(fullname)
      end
    end

    def schemas
      @schemas ||= schema_files.map do |schema_file|
        if File.exist?(schema_file)
          json = File.read(schema_file)
          [json, Avro::Schema.parse(json)]
        end
      end.compact
    end

    def build_schema_registry
      AvroSchemaRegistry::Client.new(Avrolution.deployment_schema_registry_url,
                                     logger: Avrolution.logger)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
avrolution-0.7.1 lib/avrolution/register_schemas.rb
avrolution-0.7.0 lib/avrolution/register_schemas.rb