require "kafka_session/version" require 'kafka_session/delivered_message' require 'kafka_session/message' require 'securerandom' require 'active_support/time' class KafkaSession class << self attr_writer :session_id, :clock attr_accessor :producer end def self.session_id @session_id ||= SecureRandom.uuid end def self.clock @clock ||= Time.public_method(:current) end def self.now clock.call end attr_reader :id attr_accessor :producer def initialize(id: self.class.session_id, producer: self.class.producer) @id = id @producer = producer end def publish(topic, *message_values) messages = message_values.map do |message_value| Message.new(message_value, session_id: id).to_json end producer.publish(topic: topic, messages: messages) end end