Sha256: 6e3dd349a28916558bfaf895f43dcd93fa92dd7158da917ba9d57db0935d47c1
Contents?: true
Size: 1.86 KB
Versions: 2
Compression:
Stored size: 1.86 KB
Contents
module DeliveryBoy # A fake implementation that is useful for testing. class Fake FakeMessage = Struct.new(:value, :topic, :key, :headers, :offset, :partition, :partition_key, :create_time) do def bytesize key.to_s.bytesize + value.to_s.bytesize end end def initialize @messages = Hash.new {|h, k| h[k] = [] } @buffer = Hash.new {|h, k| h[k] = [] } @delivery_lock = Mutex.new end def deliver(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) @delivery_lock.synchronize do offset = @messages[topic].count message = FakeMessage.new(value, topic, key, headers, offset, partition, partition_key, create_time) @messages[topic] << message end nil end alias deliver_async! deliver def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key: nil, create_time: Time.now) @delivery_lock.synchronize do offset = @buffer[topic].count message = FakeMessage.new(value, topic, key, headers, offset, partition, partition_key, create_time) @buffer[topic] << message end nil end def deliver_messages @delivery_lock.synchronize do @buffer.each do |topic, messages| @messages[topic].push(*messages) end @buffer.clear end end def shutdown clear end # Clear all messages stored in memory. def clear @delivery_lock.synchronize do @messages.clear @buffer.clear end end # Return all messages written to the specified topic. def messages_for(topic) @delivery_lock.synchronize do # Return a clone so that the list of messages can be traversed # without worrying about a concurrent modification @messages[topic].clone end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
delivery_boy-1.0.1 | lib/delivery_boy/fake.rb |
delivery_boy-1.0.0 | lib/delivery_boy/fake.rb |