Sha256: 1b4345e106afcb8a492b1abc5fa81333cdb911adef887e5ff726c675a78bacba
Contents?: true
Size: 1.81 KB
Versions: 1
Compression:
Stored size: 1.81 KB
Contents
module DeliveryBoy # A fake implementation that is useful for testing. class Fake FakeMessage = Struct.new(:value, :topic, :key, :offset, :partition, :partition_key, :create_time) do def bytesize key.to_s.bytesize + value.to_s.bytesize end end def initialize @messages = Hash.new {|h, k| h[k] = [] } @buffer = Hash.new {|h, k| h[k] = [] } @delivery_lock = Mutex.new end def deliver(value, topic:, key: nil, partition: nil, partition_key: nil, create_time: Time.now) @delivery_lock.synchronize do offset = @messages[topic].count message = FakeMessage.new(value, topic, key, offset, partition, partition_key, create_time) @messages[topic] << message end nil end alias deliver_async! deliver def produce(value, topic:, key: nil, partition: nil, partition_key: nil, create_time: Time.now) @delivery_lock.synchronize do offset = @buffer[topic].count message = FakeMessage.new(value, topic, key, offset, partition, partition_key, create_time) @buffer[topic] << message end nil end def deliver_messages @delivery_lock.synchronize do @buffer.each do |topic, messages| @messages[topic].push(*messages) end @buffer.clear end end def shutdown clear end # Clear all messages stored in memory. def clear @delivery_lock.synchronize do @messages.clear @buffer.clear end end # Return all messages written to the specified topic. def messages_for(topic) @delivery_lock.synchronize do # Return a clone so that the list of messages can be traversed # without worrying about a concurrent modification @messages[topic].clone end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
delivery_boy-0.2.8 | lib/delivery_boy/fake.rb |