Sha256: 80e08b8fb9f56cc3909d75ee077f076423d298812634e930ae13b308d4a93cb3

Contents?: true

Size: 806 Bytes

Versions: 6

Compression:

Stored size: 806 Bytes

Contents

# frozen_string_literal: true

require "sidekiq"

module SidekiqPublisher
  class Client < Sidekiq::Client
    def bulk_push(items)
      payloads = items.map do |item|
        normed = normalize_item(item)
        middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do
          verify_json(normed) if respond_to?(:verify_json) # needed here as of v6.4.2, formerly done by normalize_item
          normed
        end || nil
      end.compact

      pushed = 0
      with_connection do |conn|
        conn.multi do |transaction|
          payloads.each do |payload|
            atomic_push(transaction, [payload])
            pushed += 1
          end
        end
      end

      pushed
    end

    private

    def with_connection(&blk)
      @redis_pool.with(&blk)
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
sidekiq_publisher-6.0.0 lib/sidekiq_publisher/client.rb
sidekiq_publisher-5.0.0 lib/sidekiq_publisher/client.rb
sidekiq_publisher-4.0.0 lib/sidekiq_publisher/client.rb
sidekiq_publisher-3.0.0 lib/sidekiq_publisher/client.rb
sidekiq_publisher-2.4.0 lib/sidekiq_publisher/client.rb
sidekiq_publisher-2.3.0 lib/sidekiq_publisher/client.rb