Sha256: 11ec123ee5cc123c720d0a3b4236fcec3ba034dd73e94d9db6b2007ef72ce183
Contents?: true
Size: 994 Bytes
Versions: 1
Compression:
Stored size: 994 Bytes
Contents
require 'poseidon' require 'securerandom' require 'citrus/event_bus/configuration' module Citrus module EventBus class Subscriber def initialize(configuration = Configuration.new) @configuration = configuration @consumer = Poseidon::PartitionConsumer.new( SecureRandom.uuid, configuration.host, configuration.port, configuration.topic, 0, :latest_offset ) @messages_cache = [] end def call @messages_cache += simulate_blocking_io_because_poseidon_is_not_my_application if @messages_cache.empty? message = @messages_cache.shift @configuration.event_serializer.load(message.value) end def simulate_blocking_io_because_poseidon_is_not_my_application loop do messages = @consumer.fetch(:max_wait => 10_000, :min_bytes => 1) break(messages) if messages.any? end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
citrus-event-bus-0.0.1 | lib/citrus/event_bus/subscriber.rb |