Sha256: 85cbda0de81d80c2ab57069f140fd807c9de15fae75800d2c5200192999fdb46

Contents?: true

Size: 1.45 KB

Versions: 5

Compression:

Stored size: 1.45 KB

Contents

# frozen_string_literal: true

begin
  require 'kafka'
rescue LoadError # rubocop:disable Lint/SuppressedException
end

module PubSubModelSync
  class ServiceKafka < ServiceBase
    cattr_accessor :producer
    attr_accessor :config, :service, :consumer

    CONSUMER_GROUP = 'service_model_sync'

    def initialize
      @config = PubSubModelSync::Config
      @service = Kafka.new(*config.kafka_connection)
    end

    def listen_messages
      log('Listener starting...')
      start_consumer
      consumer.each_message(&method(:process_message))
    rescue PubSubModelSync::Runner::ShutDown
      log('Listener stopped')
    rescue => e
      log("Error listening message: #{[e.message, e.backtrace]}", :error)
    end

    def publish(payload)
      producer.produce(payload.to_json, message_settings)
      producer.deliver_messages
    end

    def stop
      log('Listener stopping...')
      consumer.stop
    end

    private

    def message_settings
      { topic: config.topic_name, headers: { SERVICE_KEY => true } }
    end

    def start_consumer
      @consumer = service.consumer(group_id: CONSUMER_GROUP)
      consumer.subscribe(config.topic_name)
    end

    def producer
      return self.class.producer if self.class.producer

      at_exit { self.class.producer.shutdown }
      self.class.producer = service.producer
    end

    def process_message(message)
      return unless message.headers[SERVICE_KEY]

      super(message.value)
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
pub_sub_model_sync-0.5.2 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.5.1.1 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.5.1 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.5.0.1 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.5.0 lib/pub_sub_model_sync/service_kafka.rb