Sha256: c5a3a86dd5e8a0bf4481cedbcce91ebb5a338b724e5aa1239daf8378c4e559fb

Contents?: true

Size: 1.2 KB

Versions: 1

Compression:

Stored size: 1.2 KB

Contents

require 'kafka_session/producer'
require 'kafka_session/mock_producer'
require "kafka_session/version"

require 'kafka_session/delivered_message'
require 'kafka_session/message'

require 'securerandom'

class KafkaSession
  class << self
    attr_writer :clock, :producer
  end

  def self.configure(options = {})
    producer_options = {
      name:    options.fetch(:name),
      brokers: options.fetch(:brokers)
    }

    @producer = Producer.new(producer_options)
  end

  def self.producer
    @producer ||= MockProducer.new
  end

  def self.session_id
    Thread.current[:kafka_session_session_id] ||= SecureRandom.uuid
  end

  def self.session_id=(session_id)
    Thread.current[:kafka_session_session_id] = session_id
  end

  def self.clock
    @clock ||= proc { Time.now.utc }
  end

  def self.now
    clock.call
  end

  attr_reader :id
  attr_accessor :producer

  def initialize(id: self.class.session_id, producer: self.class.producer)
    @id       = id
    @producer = producer
  end

  def publish(topic, *message_values)
    messages = message_values.map do |message_value|
      Message.new(message_value, session_id: id).to_json
    end

    producer.publish(topic: topic, messages: messages)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
kafka_session-0.5.0 lib/kafka_session.rb