# frozen_string_literal: true require "securerandom" require "sidekiq/middleware/chain" require "sidekiq/job_util" module Sidekiq class Client include Sidekiq::JobUtil ## # Define client-side middleware: # # client = Sidekiq::Client.new # client.middleware do |chain| # chain.use MyClientMiddleware # end # client.push('class' => 'SomeJob', 'args' => [1,2,3]) # # All client instances default to the globally-defined # Sidekiq.client_middleware but you can change as necessary. # def middleware(&block) if block @chain = @chain.dup yield @chain end @chain end attr_accessor :redis_pool # Sidekiq::Client is responsible for pushing job payloads to Redis. # Requires the :pool or :config keyword argument. # # Sidekiq::Client.new(pool: Sidekiq::RedisConnection.create) # # Inside the Sidekiq process, you can reuse the configured resources: # # Sidekiq::Client.new(config: config) # # @param pool [ConnectionPool] explicit Redis pool to use # @param config [Sidekiq::Config] use the pool and middleware from the given Sidekiq container # @param chain [Sidekiq::Middleware::Chain] use the given middleware chain def initialize(*args, **kwargs) if args.size == 1 && kwargs.size == 0 warn "Sidekiq::Client.new(pool) is deprecated, please use Sidekiq::Client.new(pool: pool), #{caller(0..3)}" # old calling method, accept 1 pool argument @redis_pool = args[0] @chain = Sidekiq.default_configuration.client_middleware @config = Sidekiq.default_configuration else # new calling method: keyword arguments @config = kwargs[:config] || Sidekiq.default_configuration @redis_pool = kwargs[:pool] || Thread.current[:sidekiq_redis_pool] || @config&.redis_pool @chain = kwargs[:chain] || @config&.client_middleware raise ArgumentError, "No Redis pool available for Sidekiq::Client" unless @redis_pool end end ## # The main method used to push a job to Redis. Accepts a number of options: # # queue - the named queue to use, default 'default' # class - the job class to call, required # args - an array of simple arguments to the perform method, must be JSON-serializable # at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f) # retry - whether to retry this job if it fails, default true or an integer number of retries # backtrace - whether to save any error backtrace, default false # # If class is set to the class name, the jobs' options will be based on Sidekiq's default # job options. Otherwise, they will be based on the job class's options. # # Any options valid for a job class's sidekiq_options are also available here. # # All options must be strings, not symbols. NB: because we are serializing to JSON, all # symbols in 'args' will be converted to strings. Note that +backtrace: true+ can take quite a bit of # space in Redis; a large volume of failing jobs can start Redis swapping if you aren't careful. # # Returns a unique Job ID. If middleware stops the job, nil will be returned instead. # # Example: # push('queue' => 'my_queue', 'class' => MyJob, 'args' => ['foo', 1, :bat => 'bar']) # def push(item) normed = normalize_item(item) payload = middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do normed end if payload verify_json(payload) raw_push([payload]) payload["jid"] end end ## # Push a large number of jobs to Redis. This method cuts out the redis # network round trip latency. I wouldn't recommend pushing more than # 1000 per call but YMMV based on network quality, size of job args, etc. # A large number of jobs can cause a bit of Redis command processing latency. # # Takes the same arguments as #push except that args is expected to be # an Array of Arrays. All other keys are duplicated for each job. Each job # is run through the client middleware pipeline and each job gets its own Job ID # as normal. # # Returns an array of the of pushed jobs' jids. The number of jobs pushed can be less # than the number given if the middleware stopped processing for one or more jobs. def push_bulk(items) args = items["args"] raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" unless args.is_a?(Array) && args.all?(Array) return [] if args.empty? # no jobs to push at = items.delete("at") raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all? { |entry| entry.is_a?(Numeric) }) raise ArgumentError, "Job 'at' Array must have same size as 'args' Array" if at.is_a?(Array) && at.size != args.size jid = items.delete("jid") raise ArgumentError, "Explicitly passing 'jid' when pushing more than one job is not supported" if jid && args.size > 1 normed = normalize_item(items) payloads = args.map.with_index { |job_args, index| copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12)) copy["at"] = (at.is_a?(Array) ? at[index] : at) if at result = middleware.invoke(items["class"], copy, copy["queue"], @redis_pool) do verify_json(copy) copy end result || nil }.compact raw_push(payloads) unless payloads.empty? payloads.collect { |payload| payload["jid"] } end # Allows sharding of jobs across any number of Redis instances. All jobs # defined within the block will use the given Redis connection pool. # # pool = ConnectionPool.new { Redis.new } # Sidekiq::Client.via(pool) do # SomeJob.perform_async(1,2,3) # SomeOtherJob.perform_async(1,2,3) # end # # Generally this is only needed for very large Sidekiq installs processing # thousands of jobs per second. I do not recommend sharding unless # you cannot scale any other way (e.g. splitting your app into smaller apps). def self.via(pool) raise ArgumentError, "No pool given" if pool.nil? current_sidekiq_pool = Thread.current[:sidekiq_redis_pool] Thread.current[:sidekiq_redis_pool] = pool yield ensure Thread.current[:sidekiq_redis_pool] = current_sidekiq_pool end class << self def push(item) new.push(item) end def push_bulk(items) new.push_bulk(items) end # Resque compatibility helpers. Note all helpers # should go through Sidekiq::Job#client_push. # # Example usage: # Sidekiq::Client.enqueue(MyJob, 'foo', 1, :bat => 'bar') # # Messages are enqueued to the 'default' queue. # def enqueue(klass, *args) klass.client_push("class" => klass, "args" => args) end # Example usage: # Sidekiq::Client.enqueue_to(:queue_name, MyJob, 'foo', 1, :bat => 'bar') # def enqueue_to(queue, klass, *args) klass.client_push("queue" => queue, "class" => klass, "args" => args) end # Example usage: # Sidekiq::Client.enqueue_to_in(:queue_name, 3.minutes, MyJob, 'foo', 1, :bat => 'bar') # def enqueue_to_in(queue, interval, klass, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) item = {"class" => klass, "args" => args, "at" => ts, "queue" => queue} item.delete("at") if ts <= now klass.client_push(item) end # Example usage: # Sidekiq::Client.enqueue_in(3.minutes, MyJob, 'foo', 1, :bat => 'bar') # def enqueue_in(interval, klass, *args) klass.perform_in(interval, *args) end end private def raw_push(payloads) @redis_pool.with do |conn| retryable = true begin conn.pipelined do |pipeline| atomic_push(pipeline, payloads) end rescue RedisClient::Error => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The retry logic is copied from sidekiq.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.close retryable = false retry end raise end end true end def atomic_push(conn, payloads) if payloads.first.key?("at") conn.zadd("schedule", payloads.flat_map { |hash| at = hash.delete("at").to_s [at, Sidekiq.dump_json(hash)] }) else queue = payloads.first["queue"] now = Time.now.to_f to_push = payloads.map { |entry| entry["enqueued_at"] = now Sidekiq.dump_json(entry) } conn.sadd("queues", [queue]) conn.lpush("queue:#{queue}", to_push) end end end end