Sha256: 15a75192746f21e44c31ebf918bd086a776a333509336505217ff3bc521e05cf

Contents?: true

Size: 1.14 KB

Versions: 1

Compression:

Stored size: 1.14 KB

Contents

module Dynamiq
  module Worker
    attr_accessor :jid

    def self.included(base)
      base.class.include Sidekiq::Worker
      base.extend Sidekiq::Worker::ClassMethods
      base.extend ClassMethods

      base.class_attribute :sidekiq_options_hash
      base.class_attribute :sidekiq_retry_in_block
      base.class_attribute :sidekiq_retries_exhausted_block
    end

    module ClassMethods
      def perform_async(score, *args)
        client_push score: score, class: self, args: args
      end

      def perform_in(interval, score, *args)
        int = interval.to_f
        now = Time.now.to_f
        ts = (int < 1_000_000_000 ? now + int : int)

        item = { score: score, class: self, args: args, at: ts }

        # Optimization to enqueue something now that is scheduled to go out now or in the past
        item.delete 'at' if ts <= now

        client_push item
      end
      alias_method :perform_at, :perform_in

      def client_push(item) # :nodoc:
        pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
        Dynamiq::Client.new(pool).push(item.stringify_keys)
      end 
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dynamiq-0.1.0 lib/dynamiq/worker.rb