Sha256: 394293c0171520bd427507e4d73cfcb892c37da751b265bcd2739b89934ee8eb

Contents?: true

Size: 1.22 KB

Versions: 1

Compression:

Stored size: 1.22 KB

Contents

require 'kafka_session/producer'
require 'kafka_session/mock_producer'
require "kafka_session/version"

require 'kafka_session/delivered_message'
require 'kafka_session/message'

require 'securerandom'
require 'active_support/time'
require 'active_support/core_ext/hash'

class KafkaSession
  class << self
    attr_writer :clock, :producer
  end

  def self.configure(options = {})
    producer_options = options.slice(:name, :brokers)

    @producer = Producer.new(producer_options)
  end

  def self.producer
    @producer ||= MockProducer.new
  end

  def self.session_id
    Thread.current[:kafka_session_session_id] ||= SecureRandom.uuid
  end

  def self.session_id=(session_id)
    Thread.current[:kafka_session_session_id] = session_id
  end

  def self.clock
    @clock ||= Time.public_method(:current)
  end

  def self.now
    clock.call
  end

  attr_reader :id
  attr_accessor :producer

  def initialize(id: self.class.session_id, producer: self.class.producer)
    @id       = id
    @producer = producer
  end

  def publish(topic, *message_values)
    messages = message_values.map do |message_value|
      Message.new(message_value, session_id: id).to_json
    end

    producer.publish(topic: topic, messages: messages)
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
kafka_session-0.4.1 lib/kafka_session.rb