Sha256: 652daa876c7f27f46b22fbc03ec588a44416e93e0650cf53b0c5dae8fcf2a656

Contents?: true

Size: 1.02 KB

Versions: 5

Compression:

Stored size: 1.02 KB

Contents

module Qwirk
  module Adapter
    module InMemory

      class Topic
        def initialize(name)
          @name              = name
          @worker_hash_mutex = Mutex.new
          @worker_hash       = {}
        end

        def get_worker_queue(worker_name, queue_max_size)
          @worker_hash_mutex.synchronize do
            queue = @worker_hash[worker_name] ||= Queue.new("#{@name}:#{worker_name}")
            queue.max_size = queue_max_size
            return queue
          end
        end

        def stop
          @worker_hash_mutex.synchronize do
            @worker_hash.each_value do |queue|
              queue.stop
            end
          end
        end

        def read
          raise "topic should not have been read for #{name}"
        end

        def write(obj)
          @worker_hash_mutex.synchronize do
            @worker_hash.each_value do |queue|
              queue.write(obj)
            end
          end
        end

        def to_s
          "topic:#{@name}"
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
qwirk-0.2.4 lib/qwirk/adapter/in_memory/topic.rb
qwirk-0.2.3 lib/qwirk/adapter/in_memory/topic.rb
qwirk-0.2.2 lib/qwirk/adapter/in_memory/topic.rb
qwirk-0.2.1 lib/qwirk/adapter/in_memory/topic.rb
qwirk-0.2.0 lib/qwirk/adapter/in_memory/topic.rb