Sha256: 11ec123ee5cc123c720d0a3b4236fcec3ba034dd73e94d9db6b2007ef72ce183

Contents?: true

Size: 994 Bytes

Versions: 1

Compression:

Stored size: 994 Bytes

Contents

require 'poseidon'
require 'securerandom'
require 'citrus/event_bus/configuration'

module Citrus
  module EventBus
    class Subscriber

      def initialize(configuration = Configuration.new)
        @configuration = configuration
        @consumer      = Poseidon::PartitionConsumer.new(
            SecureRandom.uuid,
            configuration.host,
            configuration.port,
            configuration.topic,
            0,
            :latest_offset
        )
        @messages_cache = []
      end

      def call
        @messages_cache += simulate_blocking_io_because_poseidon_is_not_my_application if @messages_cache.empty?
        message = @messages_cache.shift
        @configuration.event_serializer.load(message.value)
      end

      def simulate_blocking_io_because_poseidon_is_not_my_application
        loop do
          messages = @consumer.fetch(:max_wait => 10_000, :min_bytes => 1)
          break(messages) if messages.any?
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
citrus-event-bus-0.0.1 lib/citrus/event_bus/subscriber.rb