Sha256: b49d789cf1aa68de8a8ce4012b4679758fe18898b0fc40f8fc49541a1c6f2e53

Contents?: true

Size: 966 Bytes

Versions: 2

Compression:

Stored size: 966 Bytes

Contents

# frozen_string_literal: true

module SalesforceStreamer
  class Server
    attr_writer :push_topics

    def initialize(config:, push_topics: [])
      @logger = config.logger
      @push_topics = push_topics
      @client = Restforce.new
    end

    def run
      @client.authenticate!
      @logger.info 'Starting Server'
      catch_signals
      start_em
    end

    private

    def catch_signals
      %w[INT USR1 USR2 TERM TTIN TSTP].each do |sig|
        trap sig do
          puts "Caught signal #{sig}. Shutting down..."
          exit 0
        end
      end
    end

    def start_em
      EM.run do
        @push_topics.map do |topic|
          @client.subscribe topic.name, replay: topic.replay.to_i do |msg|
            @logger.debug(msg)
            topic.handler_constant.call(msg)
            @logger.info("Message processed: channel=#{topic.name} replayId=#{msg.dig('event', 'replayId')}")
          end
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
salesforce_streamer-0.1.1 lib/salesforce_streamer/server.rb
salesforce_streamer-0.1.0 lib/salesforce_streamer/server.rb