Sha256: aa13a7eb61374ef014e96127f9d8011370ff421b41eb805415ef621c6257403a

Contents?: true

Size: 828 Bytes

Versions: 4

Compression:

Stored size: 828 Bytes

Contents

require "delegate"

begin
  require "avro_turf/messaging"
rescue LoadError
  raise "Can't find 'avro_turf' gem. Please add it to your Gemfile or install it."
end

module GlassOctopus
  module Middleware
    class AvroParser
      def initialize(app, schema_registry_url, options={})
        @app = app
        @decoder = AvroTurf::Messaging.new(registry_url: schema_registry_url, logger: options[:logger])
      end

      def call(ctx)
        message = @decoder.decode(ctx.message.value)
        ctx = ContextWithAvroParsedMessage.new(ctx, message)
        @app.call(ctx)
      end

      private

      class ContextWithAvroParsedMessage < SimpleDelegator
        attr_reader :params

        def initialize(wrapped_ctx, params)
          super(wrapped_ctx)
          @params = params
        end
      end
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
glass_octopus-2.2.0 lib/glass_octopus/middleware/avro_parser.rb
glass_octopus-2.1.0 lib/glass_octopus/middleware/avro_parser.rb
glass_octopus-2.0.0 lib/glass_octopus/middleware/avro_parser.rb
glass_octopus-1.1.0 lib/glass_octopus/middleware/avro_parser.rb