Sha256: b74a02c6a1e5a1daccd2ac8d851b11b4b4da9ea87b213c6c2e61c65e8b9dc83b

Contents?: true

Size: 1.89 KB

Versions: 9

Compression:

Stored size: 1.89 KB

Contents

require 'spec_helper'

describe "Publishing an event in the future" do

  before(:each) do
    Timecop.freeze(now)
    QueueBus.stub(:generate_uuid).and_return("idfhlkj")
  end
  after(:each) do
    Timecop.return
  end
  let(:delayed_attrs) { {"bus_delayed_until" => future.to_i,
                     "bus_id" => "#{now.to_i}-idfhlkj",
                     "bus_app_hostname" =>  `hostname 2>&1`.strip.sub(/.local/,'')} }

  let(:bus_attrs) { delayed_attrs.merge({"bus_published_at" => worktime.to_i})}
  let(:now)    { Time.parse("01/01/2013 5:00")}
  let(:future) { Time.at(now.to_i + 60) }
  let(:worktime) {Time.at(future.to_i + 1)}

  it "should add it to Redis then to the real queue" do
    hash = {:one => 1, "two" => "here", "id" => 12 }
    event_name = "event_name"
    QueueBus.publish_at(future, event_name, hash)

    schedule = QueueBus.redis { |redis| redis.zrange("delayed_queue_schedule", 0, 1) }
    schedule.should == [future.to_i.to_s]

    val = QueueBus.redis { |redis| redis.lpop("delayed:#{future.to_i}") }
    hash = JSON.parse(val)

    hash["class"].should == "QueueBus::Worker"
    hash["args"].size.should == 1
    JSON.parse(hash["args"].first).should == {"bus_class_proxy" => "QueueBus::Publisher", "bus_event_type"=>"event_name", "two"=>"here", "one"=>1, "id" => 12}.merge(delayed_attrs)
    hash["queue"].should == "bus_incoming"

    val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") }
    val.should == nil # nothing really added

    Timecop.freeze(worktime)
    QueueBus::Publisher.perform(JSON.parse(hash["args"].first))

    val = QueueBus.redis { |redis| redis.lpop("queue:bus_incoming") }
    hash = JSON.parse(val)
    hash["class"].should == "QueueBus::Worker"
    hash["args"].size.should == 1
    JSON.parse(hash["args"].first).should == {"bus_class_proxy" => "QueueBus::Driver", "bus_event_type"=>"event_name", "two"=>"here", "one"=>1, "id" => 12}.merge(bus_attrs)
  end

end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
resque-bus-0.7.0 spec/adapter/publish_at_spec.rb
resque-bus-0.6.1 spec/adapter/publish_at_spec.rb
resque-bus-0.6.0 spec/adapter/publish_at_spec.rb
resque-bus-0.5.12 spec/adapter/publish_at_spec.rb
resque-bus-0.5.11 spec/adapter/publish_at_spec.rb
resque-bus-0.5.10 spec/adapter/publish_at_spec.rb
resque-bus-0.5.9 spec/adapter/publish_at_spec.rb
resque-bus-0.5.8 spec/adapter/publish_at_spec.rb
resque-bus-0.5.7 spec/adapter/publish_at_spec.rb