Sha256: 6e3dd349a28916558bfaf895f43dcd93fa92dd7158da917ba9d57db0935d47c1

Contents?: true

Size: 1.86 KB

Versions: 2

Compression:

Stored size: 1.86 KB

Contents

module DeliveryBoy

  # A fake implementation that is useful for testing.
  class Fake
    FakeMessage = Struct.new(:value, :topic, :key, :headers, :offset, :partition, :partition_key, :create_time) do
      def bytesize
        key.to_s.bytesize + value.to_s.bytesize
      end
    end

    def initialize
      @messages = Hash.new {|h, k| h[k] = [] }
      @buffer = Hash.new {|h, k| h[k] = [] }
      @delivery_lock = Mutex.new
    end

    def deliver(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now)
      @delivery_lock.synchronize do
        offset = @messages[topic].count
        message = FakeMessage.new(value, topic, key, headers, offset, partition, partition_key, create_time)

        @messages[topic] << message
      end

      nil
    end

    alias deliver_async! deliver

    def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now)
      @delivery_lock.synchronize do
        offset = @buffer[topic].count
        message = FakeMessage.new(value, topic, key, headers, offset, partition, partition_key, create_time)

        @buffer[topic] << message
      end

      nil
    end

    def deliver_messages
      @delivery_lock.synchronize do
        @buffer.each do |topic, messages|
          @messages[topic].push(*messages)
        end
        @buffer.clear
      end
    end

    def shutdown
      clear
    end

    # Clear all messages stored in memory.
    def clear
      @delivery_lock.synchronize do
        @messages.clear
        @buffer.clear
      end
    end

    # Return all messages written to the specified topic.
    def messages_for(topic)
      @delivery_lock.synchronize do
        # Return a clone so that the list of messages can be traversed
        # without worrying about a concurrent modification
        @messages[topic].clone
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
delivery_boy-1.0.1 lib/delivery_boy/fake.rb
delivery_boy-1.0.0 lib/delivery_boy/fake.rb