Sha256: c45d48fd8eb82be3c6ba77540e503020a3b2b79d3e58dc9b7494d229e1dc843a

Contents?: true

Size: 1.08 KB

Versions: 3

Compression:

Stored size: 1.08 KB

Contents

module SalesforceStreamer
  class Server
    attr_writer :push_topics

    def initialize(push_topics: [])
      @push_topics = push_topics
    end

    def run
      Log.info 'Starting Server'
      catch_signals
      start_em
    end

    private

    def catch_signals
      %w[INT USR1 USR2 TERM TTIN TSTP].each do |sig|
        trap sig do
          puts "Caught signal #{sig}. Shutting down..."
          exit 0
        end
      end
    end

    def client
      return @client if @client
      @client = Restforce.new
      @client.authenticate!
      @client.faye.add_extension ReplayIdErrorExtension.new
      @client
    end

    def start_em
      EM.run do
        @push_topics.map do |topic|
          replay_id = Configuration.instance.replay_adapter.call(topic)
          client.subscribe topic.name, replay: replay_id.to_i do |message|
            replay_id = message.dig('event', 'replayId')
            Log.info "Message #{replay_id} received from topic #{topic.name}"
            topic.handle message
            topic.id = replay_id
          end
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
salesforce_streamer-2.0.0 lib/salesforce_streamer/server.rb
salesforce_streamer-2.0.0.rc2 lib/salesforce_streamer/server.rb
salesforce_streamer-2.0.0.rc1 lib/salesforce_streamer/server.rb