lib/gush/worker.rb in gush-0.4.1 vs lib/gush/worker.rb in gush-1.0.0

- old
+ new

@@ -1,103 +1,75 @@ -require 'sidekiq' +require 'active_job' module Gush - class Worker - include ::Sidekiq::Worker - sidekiq_options retry: false - + class Worker < ::ActiveJob::Base def perform(workflow_id, job_id) setup_job(workflow_id, job_id) - job.payloads_hash = incoming_payloads + job.payloads = incoming_payloads - start = Time.now - report(:started, start) - - failed = false error = nil mark_as_started begin - job.work - rescue Exception => error + job.perform + rescue StandardError => error mark_as_failed - report(:failed, start, error.message) raise error else mark_as_finished - report(:finished, start) - enqueue_outgoing_jobs end end private - attr_reader :client, :workflow, :job + attr_reader :client, :workflow_id, :job + def client @client ||= Gush::Client.new(Gush.configuration) end def setup_job(workflow_id, job_id) - @workflow ||= client.find_workflow(workflow_id) - @job ||= workflow.find_job(job_id) + @workflow_id = workflow_id + @job ||= client.find_job(workflow_id, job_id) end def incoming_payloads - payloads = {} - job.incoming.each do |job_name| - job = client.load_job(workflow.id, job_name) - payloads[job.klass.to_s] ||= [] - payloads[job.klass.to_s] << {:id => job.name, :payload => job.output_payload} + job.incoming.map do |job_name| + job = client.find_job(workflow_id, job_name) + { + id: job.name, + class: job.klass.to_s, + output: job.output_payload + } end - payloads end def mark_as_finished job.finish! - client.persist_job(workflow.id, job) + client.persist_job(workflow_id, job) end def mark_as_failed job.fail! - client.persist_job(workflow.id, job) + client.persist_job(workflow_id, job) end def mark_as_started job.start! - client.persist_job(workflow.id, job) + client.persist_job(workflow_id, job) end - def report_workflow_status - client.workflow_report({ - workflow_id: workflow.id, - status: workflow.status, - started_at: workflow.started_at, - finished_at: workflow.finished_at - }) - end - - def report(status, start, error = nil) - message = { - status: status, - workflow_id: workflow.id, - job: job.name, - duration: elapsed(start) - } - message[:error] = error if error - client.worker_report(message) - end - def elapsed(start) (Time.now - start).to_f.round(3) end def enqueue_outgoing_jobs job.outgoing.each do |job_name| - out = client.load_job(workflow.id, job_name) + out = client.find_job(workflow_id, job_name) if out.ready_to_start? - client.enqueue_job(workflow.id, out) + client.enqueue_job(workflow_id, out) end end end end end