Sha256: c5a3a86dd5e8a0bf4481cedbcce91ebb5a338b724e5aa1239daf8378c4e559fb
Contents?: true
Size: 1.2 KB
Versions: 1
Compression:
Stored size: 1.2 KB
Contents
require 'kafka_session/producer' require 'kafka_session/mock_producer' require "kafka_session/version" require 'kafka_session/delivered_message' require 'kafka_session/message' require 'securerandom' class KafkaSession class << self attr_writer :clock, :producer end def self.configure(options = {}) producer_options = { name: options.fetch(:name), brokers: options.fetch(:brokers) } @producer = Producer.new(producer_options) end def self.producer @producer ||= MockProducer.new end def self.session_id Thread.current[:kafka_session_session_id] ||= SecureRandom.uuid end def self.session_id=(session_id) Thread.current[:kafka_session_session_id] = session_id end def self.clock @clock ||= proc { Time.now.utc } end def self.now clock.call end attr_reader :id attr_accessor :producer def initialize(id: self.class.session_id, producer: self.class.producer) @id = id @producer = producer end def publish(topic, *message_values) messages = message_values.map do |message_value| Message.new(message_value, session_id: id).to_json end producer.publish(topic: topic, messages: messages) end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
kafka_session-0.5.0 | lib/kafka_session.rb |