Sha256: 091072905319705387a7e8c358cecabb0a194f513e06cfae7513e9eff536f024

Contents?: true

Size: 1.51 KB

Versions: 2

Compression:

Stored size: 1.51 KB

Contents

module Rpush
  module Daemon
    class Feeder
      extend Reflectable

      def self.start(push_mode = false)
        self.should_stop = false

        @thread = Thread.new do
          push_mode ? feed_all : feed_forever
          Rpush::Daemon.store.release_connection
        end

        @thread.join
      end

      def self.stop
        self.should_stop = true
        interruptible_sleeper.stop
        @thread.join if @thread
      end

      def self.wakeup
        interruptible_sleeper.wakeup
      end

      class << self
        attr_accessor :should_stop
      end

      def self.feed_all
        enqueue_notifications until Rpush::Daemon.store.pending_delivery_count == 0
      end

      def self.feed_forever
        loop do
          enqueue_notifications
          interruptible_sleeper.sleep
          break if should_stop
        end
      end

      def self.enqueue_notifications
        batch_size = Rpush.config.batch_size - Rpush::Daemon::AppRunner.total_queued
        return if batch_size <= 0
        notifications = Rpush::Daemon.store.deliverable_notifications(batch_size)
        Rpush::Daemon::AppRunner.enqueue(notifications)
      rescue StandardError => e
        Rpush.logger.error(e)
        reflect(:error, e)
      end

      def self.interruptible_sleeper
        return @interruptible_sleeper if @interruptible_sleeper
        @interruptible_sleeper = InterruptibleSleep.new(Rpush.config.push_poll)
        @interruptible_sleeper.start
        @interruptible_sleeper
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rpush-2.3.2-java lib/rpush/daemon/feeder.rb
rpush-2.3.2 lib/rpush/daemon/feeder.rb