Sha256: 7a490db90bc6b8cf46e8a7526e7c9e2278f33bd3ec7424276211dae42aa76d9f

Contents?: true

Size: 1.57 KB

Versions: 2

Compression:

Stored size: 1.57 KB

Contents

require 'excon'

class AvroTurf::SchemaRegistry
  CONTENT_TYPE = "application/vnd.schemaregistry.v1+json".freeze

  def initialize(url, logger: Logger.new($stdout))
    @logger = logger
    @connection = Excon.new(url, headers: {
      "Content-Type" => CONTENT_TYPE,
    })
  end

  def fetch(id)
    @logger.info "Fetching schema with id #{id}"
    data = get("/schemas/ids/#{id}")
    data.fetch("schema")
  end

  def register(subject, schema)
    data = post("/subjects/#{subject}/versions", body: {
      schema: schema.to_s
    }.to_json)

    id = data.fetch("id")

    @logger.info "Registered schema for subject `#{subject}`; id = #{id}"

    id
  end

  # List all subjects
  def subjects
    get('/subjects')
  end

  # List all versions for a subject
  def subject_versions(subject)
    get("/subjects/#{subject}/versions")
  end

  # Get a specific version for a subject
  def subject_version(subject, version = 'latest')
    get("/subjects/#{subject}/versions/#{version}")
  end

  # Check if a schema exists. Returns nil if not found.
  def check(subject, schema)
    data = post("/subjects/#{subject}",
                expects: [200, 404],
                body: { schema: schema.to_s }.to_json)
    data unless data.has_key?("error_code")
  end

  private

  def get(path, **options)
    request(path, method: :get, **options)
  end

  def post(path, **options)
    request(path, method: :post, **options)
  end

  def request(path, **options)
    options = { expects: 200 }.merge!(options)
    response = @connection.request(path: path, **options)
    JSON.parse(response.body)
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
avro_turf-0.7.0 lib/avro_turf/schema_registry.rb
avro_turf-0.6.2 lib/avro_turf/schema_registry.rb