Sha256: a58b4d3d9f566cdecbf0d535a1067cece50da8614b6c6e50e855a8ca97ff808f

Contents?: true

Size: 950 Bytes

Versions: 3

Compression:

Stored size: 950 Bytes

Contents

require "kafka_session/version"
require 'kafka_session/delivered_message'
require 'kafka_session/message'

require 'securerandom'
require 'active_support/time'

class KafkaSession
  class << self
    attr_writer :clock
    attr_accessor :producer
  end

  def self.session_id
    Thread.current[:kafka_session_session_id] ||= SecureRandom.uuid
  end

  def self.session_id=(session_id)
    Thread.current[:kafka_session_session_id] = session_id
  end

  def self.clock
    @clock ||= Time.public_method(:current)
  end

  def self.now
    clock.call
  end

  attr_reader :id
  attr_accessor :producer

  def initialize(id: self.class.session_id, producer: self.class.producer)
    @id       = id
    @producer = producer
  end

  def publish(topic, *message_values)
    messages = message_values.map do |message_value|
      Message.new(message_value, session_id: id).to_json
    end

    producer.publish(topic: topic, messages: messages)
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
kafka_session-0.3.0 lib/kafka_session.rb
kafka_session-0.2.1 lib/kafka_session.rb
kafka_session-0.2.0 lib/kafka_session.rb