Sha256: 7bbe6e4af7523987dd5dc143fc0365fe63fb8f8ff521167c1f2c0fb1cca4363d

Contents?: true

Size: 1.89 KB

Versions: 9

Compression:

Stored size: 1.89 KB

Contents

# frozen_string_literal: true

begin
  require 'kafka'
rescue LoadError # rubocop:disable Lint/SuppressedException
end

module PubSubModelSync
  class ServiceKafka < ServiceBase
    cattr_accessor :producer

    attr_accessor :service, :consumer
    attr_accessor :config
    CONSUMER_GROUP = 'service_model_sync'

    def initialize
      @config = PubSubModelSync::Config
      @service = Kafka.new(*config.kafka_connection)
    end

    def listen_messages
      log('Listener starting...')
      start_consumer
      consumer.each_message(&method(:process_message))
    rescue PubSubModelSync::Runner::ShutDown
      raise
    rescue => e
      log("Error listening message: #{[e.message, e.backtrace]}", :error)
    end

    def publish(data, attributes)
      log("Publishing: #{[data, attributes]}")
      payload = { data: data, attributes: attributes }
      producer.produce(payload.to_json, message_settings)
      producer.deliver_messages
    rescue => e
      info = [data, attributes, e.message, e.backtrace]
      log("Error publishing: #{info}", :error)
    end

    def stop
      log('Listener stopping...')
      consumer.stop
    end

    private

    def message_settings
      { topic: config.topic_name, headers: { SERVICE_KEY => true } }
    end

    def start_consumer
      @consumer = service.consumer(group_id: CONSUMER_GROUP)
      consumer.subscribe(config.topic_name)
    end

    def producer
      return self.class.producer if self.class.producer

      at_exit { self.class.producer.shutdown }
      self.class.producer = service.producer
    end

    def process_message(message)
      return unless message.headers[SERVICE_KEY]

      perform_message(message.value)
    rescue => e
      error = [message, e.message, e.backtrace]
      log("Error processing message: #{error}", :error)
    end

    def log(msg, kind = :info)
      config.log("Kafka Service ==> #{msg}", kind)
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
pub_sub_model_sync-0.4.0 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.3.1 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.3.0 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.2.4 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.2.3 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.2.2 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.2.1 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.2.0 lib/pub_sub_model_sync/service_kafka.rb
pub_sub_model_sync-0.1.5 lib/pub_sub_model_sync/service_kafka.rb