Sha256: 441c3096a4e9d9312856a46dc8b4409d30d6215559c16d367a459131c45678fa

Contents?: true

Size: 1.14 KB

Versions: 22

Compression:

Stored size: 1.14 KB

Contents

require 'bbk/app/dispatcher/message_stream'

module BBK
  module App
    class Dispatcher
      class QueueStreamStrategy

        def initialize(pool, logger:)
          @pool = pool
          @logger = logger
        end

        def run(consumers, &block)
          @unblocker = Queue.new
          @stream = BBK::App::Dispatcher::MessageStream.new(size: 10)

          consumers.each {|cons| cons.run(@stream) }
          @stream.each do |msg|
            @logger.debug "[#{self.class}] Consumed message #{msg.headers}"
            @pool.post(msg) do |m|
              block.call(m)
            end
          end

          begin
            @pool.shutdown
          rescue StandardError
            nil
          end
          @pool.kill unless @pool.wait_for_termination(@stop_queue_timeout)
        ensure
          @unblocker.push(:ok)
        end

        def push(*args)
          @stream.push(*args)
        end

        def stop(timeout = 5)
          @stop_queue_timeout = timeout

          begin
            @stream.close
          rescue StandardError
            nil
          end
          @unblocker.pop
        end

      end
    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
bbk-app-1.1.1.273627 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.1.273338 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.1.273312 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.1.273294 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.219406 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.216998 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.204569 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.200751 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.200186 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.199675 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.199604 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.199389 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.1.0.199383 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.152254 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.141716 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.80957 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.79514 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.79241 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.78020 lib/bbk/app/dispatcher/queue_stream_strategy.rb
bbk-app-1.0.0.72920 lib/bbk/app/dispatcher/queue_stream_strategy.rb