spec/gush/client_spec.rb in gush-3.0.0 vs spec/gush/client_spec.rb in gush-4.0.0

- old
+ new

@@ -17,24 +17,53 @@ context "when given workflow exists" do it "returns Workflow object" do expected_workflow = TestWorkflow.create workflow = client.find_workflow(expected_workflow.id) + dependencies = workflow.dependencies expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.persisted).to eq(true) expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name)) + expect(workflow.dependencies).to eq(dependencies) end context "when workflow has parameters" do it "returns Workflow object" do - expected_workflow = ParameterTestWorkflow.create(true) + expected_workflow = ParameterTestWorkflow.create(true, kwarg: 123) workflow = client.find_workflow(expected_workflow.id) expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.arguments).to eq([true]) + expect(workflow.kwargs).to eq({ kwarg: 123 }) expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name)) end end + + context "when workflow has globals" do + it "returns Workflow object" do + expected_workflow = TestWorkflow.create(globals: { global1: 'foo' }) + workflow = client.find_workflow(expected_workflow.id) + + expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.globals[:global1]).to eq('foo') + end + end + + context "when workflow was persisted without job_klasses" do + it "returns Workflow object" do + expected_workflow = TestWorkflow.create + + json = Gush::JSON.encode(expected_workflow.to_hash.except(:job_klasses)) + redis.set("gush.workflows.#{expected_workflow.id}", json) + + workflow = client.find_workflow(expected_workflow.id) + + expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name)) + end + end end end describe "#start_workflow" do context "when there is wait parameter configured" do @@ -80,56 +109,324 @@ client.stop_workflow(workflow.id) }.to change{client.find_workflow(workflow.id).stopped?}.from(false).to(true) end end + describe "#next_free_job_id" do + it "returns an id" do + expect(client.next_free_job_id('123', Prepare.to_s)).to match(/^\h{8}-\h{4}-(\h{4})-\h{4}-\h{12}$/) + end + + it "returns an id that doesn't match an existing job id" do + workflow = TestWorkflow.create + job = workflow.jobs.first + + second_try_id = '1234' + allow(SecureRandom).to receive(:uuid).and_return(job.id, second_try_id) + + expect(client.next_free_job_id(workflow.id, job.class.to_s)).to eq(second_try_id) + end + end + + describe "#next_free_workflow_id" do + it "returns an id" do + expect(client.next_free_workflow_id).to match(/^\h{8}-\h{4}-(\h{4})-\h{4}-\h{12}$/) + end + + it "returns an id that doesn't match an existing workflow id" do + workflow = TestWorkflow.create + + second_try_id = '1234' + allow(SecureRandom).to receive(:uuid).and_return(workflow.id, second_try_id) + + expect(client.next_free_workflow_id).to eq(second_try_id) + end + end + describe "#persist_workflow" do it "persists JSON dump of the Workflow and its jobs" do job = double("job", to_json: 'json') workflow = double("workflow", id: 'abcd', jobs: [job, job, job], to_json: '"json"') - expect(client).to receive(:persist_job).exactly(3).times.with(workflow.id, job) + expect(client).to receive(:persist_job).exactly(3).times.with(workflow.id, job, expires_at: nil) expect(workflow).to receive(:mark_as_persisted) client.persist_workflow(workflow) expect(redis.keys("gush.workflows.abcd").length).to eq(1) end + + it "sets created_at index" do + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + client.persist_workflow(workflow) + end + + expect(redis.zrange("gush.idx.workflows.created_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, freeze_time.to_f]]) + + # Persisting the workflow again should not affect its created_at index score + client.persist_workflow(workflow) + expect(redis.zrange("gush.idx.workflows.created_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, freeze_time.to_f]]) + end + + it "sets expires_at index when there is a ttl configured" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + client.persist_workflow(workflow) + end + + expires_at = freeze_time + 1000 + expect(redis.zrange("gush.idx.workflows.expires_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, expires_at.to_f]]) + + # Persisting the workflow again should not affect its expires_at index score + client.persist_workflow(workflow) + expect(redis.zrange("gush.idx.workflows.expires_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, expires_at.to_f]]) + end + + it "does not set expires_at index when there is no ttl configured" do + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted) + client.persist_workflow(workflow) + + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + end + + it "does not set expires_at index when updating a pre-existing workflow without a ttl" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + client.persist_workflow(workflow) + + client.expire_workflow(workflow, -1) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + + client.persist_workflow(workflow) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + end + + it "does not change expires_at index when updating a pre-existing workflow with a non-standard ttl" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + client.persist_workflow(workflow) + + expires_at = freeze_time.to_i + 1234 + client.expire_workflow(workflow, 1234) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(expires_at) + + client.persist_workflow(workflow) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(expires_at) + end + end end describe "#destroy_workflow" do it "removes all Redis keys related to the workflow" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + workflow = TestWorkflow.create expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(1) expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(5) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(1) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(1) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(5) client.destroy_workflow(workflow) expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(0) expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(0) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(0) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) end end + describe "#expire_workflows" do + it "removes auto-expired workflows" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = TestWorkflow.create + + # before workflow's expiration time + client.expire_workflows + + expect(redis.keys("gush.workflows.*").length).to eq(1) + + # after workflow's expiration time + client.expire_workflows(Time.now.to_f + 1001) + + expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(0) + expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(0) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(0) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + + it "removes manually-expired workflows" do + workflow = TestWorkflow.create + + # workflow hasn't been expired + client.expire_workflows(Time.now.to_f + 100_000) + + expect(redis.keys("gush.workflows.*").length).to eq(1) + + client.expire_workflow(workflow, 10) + + # after workflow's expiration time + client.expire_workflows(Time.now.to_f + 20) + + expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(0) + expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(0) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(0) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + end + describe "#expire_workflow" do let(:ttl) { 2000 } - it "sets TTL for all Redis keys related to the workflow" do + it "sets an expiration time for the workflow" do workflow = TestWorkflow.create - client.expire_workflow(workflow, ttl) + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + expires_at = freeze_time.to_f + ttl + travel_to(freeze_time) do + client.expire_workflow(workflow, ttl) + end - expect(redis.ttl("gush.workflows.#{workflow.id}")).to eq(ttl) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(expires_at) workflow.jobs.each do |job| - expect(redis.ttl("gush.jobs.#{workflow.id}.#{job.klass}")).to eq(ttl) + expect(redis.zscore("gush.idx.jobs.expires_at", "#{workflow.id}.#{job.klass}")).to eq(expires_at) end end + + it "clears an expiration time for the workflow when given -1" do + workflow = TestWorkflow.create + + client.expire_workflow(workflow, 100) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to be > 0 + + client.expire_workflow(workflow, -1) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(nil) + + workflow.jobs.each do |job| + expect(redis.zscore("gush.idx.jobs.expires_at", "#{workflow.id}.#{job.klass}")).to eq(nil) + end + end end describe "#persist_job" do it "persists JSON dump of the job in Redis" do job = BobJob.new(name: 'bob', id: 'abcd123') client.persist_job('deadbeef', job) expect(redis.keys("gush.jobs.deadbeef.*").length).to eq(1) + end + + it "sets expires_at index when expires_at is provided" do + job = BobJob.new(name: 'bob', id: 'abcd123') + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + expires_at = freeze_time.to_f + 1000 + + travel_to(freeze_time) do + client.persist_job('deadbeef', job, expires_at: expires_at) + end + + expect(redis.zrange("gush.idx.jobs.expires_at", 0, -1, with_scores: true)) + .to eq([["deadbeef.#{job.klass}", expires_at]]) + + # Persisting the workflow again should not affect its expires_at index score + client.persist_job('deadbeef', job) + expect(redis.zrange("gush.idx.jobs.expires_at", 0, -1, with_scores: true)) + .to eq([["deadbeef.#{job.klass}", expires_at]]) + end + + it "does not set expires_at index when there is no ttl configured" do + job = BobJob.new(name: 'bob', id: 'abcd123') + client.persist_job('deadbeef', job) + + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + end + + describe "#workflow_ids" do + it "returns a page of registered workflow ids" do + workflow = TestWorkflow.create + ids = client.workflow_ids + expect(ids).to eq([workflow.id]) + end + + it "sorts workflow ids by created time or reverse created time" do + ids = 3.times.map { TestWorkflow.create }.map(&:id) + + expect(client.workflow_ids).to eq(ids) + expect(client.workflow_ids(order: :asc)).to eq(ids) + expect(client.workflow_ids(order: :desc)).to eq(ids.reverse) + end + + it "supports start and stop params" do + ids = 3.times.map { TestWorkflow.create }.map(&:id) + + expect(client.workflow_ids(0, 1)).to eq(ids.slice(0..1)) + expect(client.workflow_ids(1, 1)).to eq(ids.slice(1..1)) + expect(client.workflow_ids(1, 10)).to eq(ids.slice(1..2)) + expect(client.workflow_ids(0, -1)).to eq(ids) + end + + it "supports start and stop params using created timestamps" do + times = [100, 200, 300] + ids = [] + + times.each do |t| + travel_to Time.at(t) do + ids << TestWorkflow.create.id + end + end + + expect(client.workflow_ids(0, 1, by_ts: true)).to be_empty + expect(client.workflow_ids(50, 150, by_ts: true)).to eq(ids.slice(0..0)) + expect(client.workflow_ids(150, 50, by_ts: true, order: :desc)).to eq(ids.slice(0..0)) + expect(client.workflow_ids("-inf", "inf", by_ts: true)).to eq(ids) + end + end + + describe "#workflows" do + it "returns a page of registered workflows" do + workflow = TestWorkflow.create + expect(client.workflows.map(&:id)).to eq([workflow.id]) + end + end + + describe "#workflows_count" do + it "returns a count of registered workflows" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + expect(client.workflows_count).to eq(0) + + workflow = TestWorkflow.create + expect(client.workflows_count).to eq(1) + + client.expire_workflows(Time.now.to_f + 1001) + expect(client.workflows_count).to eq(0) end end describe "#all_workflows" do it "returns all registered workflows" do