Sha256: f7a572d7936b1062258c5f7d4c326305ff52d35f8d93d856bfed14c20eaf7f56

Contents?: true

Size: 739 Bytes

Versions: 1

Compression:

Stored size: 739 Bytes

Contents

module Dynamiq
  class Client < Sidekiq::Client
    def push_message(message)
      redis_pool.with do |conn|
        atomic_push conn, [ JSON.parse(message) ]
      end
    end

  private

    def atomic_push(conn, payloads)
      if payloads.first['at']
        payload = payloads.map do |hash|
          [ hash.delete('at').to_s, Sidekiq.dump_json(hash) ]
        end
        
        conn.zadd 'schedule', payload
      else
        q = payloads.first['queue']
        to_push = payloads.map do |entry| 
          [ entry.delete('score').to_i, Sidekiq.dump_json(entry) ]
        end

        conn.sadd :queues, q
        conn.sadd :dynamic_queues, q
        conn.zadd [:dynamic_queue, q].join(':'), to_push
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
dynamiq-0.1.0 lib/dynamiq/client.rb