Sha256: 0fe9d257415fccbfadcbe56c9acc8daec27c9543c1128a6063cd6ee0186a297d

Contents?: true

Size: 1.78 KB

Versions: 2

Compression:

Stored size: 1.78 KB

Contents

module Shoryuken
  module Waiter
    class Querier
      include Celluloid
      include Util

      def initialize
        delay = Shoryuken::Waiter.poll_delay
        logger.debug { "[Shoryuken::Waiter] Checking for delayed messages every #{delay} seconds" }
        @timer = every(delay) { poll }
      end

      private

      def poll
        Shoryuken::Waiter.tables.each { |table| poll_table(table) }
      end

      def poll_table(table)
        logger.debug { "[Shoryuken::Waiter] Looking for delayed messages in '#{table.table_name}' ready to be queued" }

        query_results(table).each do |response|
          items = response.items
          logger.debug { "[Shoryuken::Waiter] Found #{items.count} delayed messages in '#{table.table_name}'" }
          Shoryuken::Waiter::Scheduler.schedule_items(table, items)
        end
      end

      def query_results(table)
        threshold = (Time.now + Shoryuken::Waiter::MAX_QUEUE_DELAY).to_f
        table.query(query_options(threshold))
      end

      def query_options(threshold)
        {
          index_name: "scheduler-perform_at-index",
          select: "SPECIFIC_ATTRIBUTES",
          consistent_read: true,
          projection_expression: [
            "perform_at",
            "sqs_message_body",
            "sqs_message_attributes"
          ].join(","),
          return_consumed_capacity: "NONE",
          key_condition_expression: [
            "#H = :hashval",
            "#R < :rangeval"
          ].join(" AND "),
          expression_attribute_names: {
            "#H": "scheduler",
            "#R": "perform_at"
          },
          expression_attribute_values: {
            ":hashval": Shoryuken::Waiter::TABLE_PRIMARY_ITEM_KEY_VALUE,
            ":rangeval": threshold
          }
        }
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
shoryuken-waiter-0.0.2 lib/shoryuken/waiter/querier.rb
shoryuken-waiter-0.0.1 lib/shoryuken/waiter/querier.rb