lib/gush/client.rb in gush-0.4.1 vs lib/gush/client.rb in gush-1.0.0
- old
+ new
@@ -1,17 +1,17 @@
+require 'connection_pool'
+
module Gush
class Client
- attr_reader :configuration, :sidekiq
+ attr_reader :configuration
def initialize(config = Gush.configuration)
@configuration = config
- @sidekiq = build_sidekiq
end
def configure
yield configuration
- @sidekiq = build_sidekiq
end
def create_workflow(name)
begin
name.constantize.create
@@ -109,12 +109,11 @@
connection_pool.with do |redis|
redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json)
end
end
- def load_job(workflow_id, job_id)
- workflow = find_workflow(workflow_id)
+ def find_job(workflow_id, job_id)
job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id)
hypen = '-' if job_name_match.nil?
keys = connection_pool.with do |redis|
redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
@@ -127,11 +126,11 @@
end
return nil if data.nil?
data = Gush::JSON.decode(data, symbolize_keys: true)
- Gush::Job.from_hash(workflow, data)
+ Gush::Job.from_hash(data)
end
def destroy_workflow(workflow)
connection_pool.with do |redis|
redis.del("gush.workflows.#{workflow.id}")
@@ -143,52 +142,29 @@
connection_pool.with do |redis|
redis.del("gush.jobs.#{workflow_id}.#{job.name}")
end
end
- def worker_report(message)
- report("gush.workers.status", message)
- end
-
- def workflow_report(message)
- report("gush.workflows.status", message)
- end
-
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
- sidekiq.push(
- 'class' => Gush::Worker,
- 'queue' => configuration.namespace,
- 'args' => [workflow_id, job.name]
- )
+ Gush::Worker.set(queue: configuration.namespace).perform_later(*[workflow_id, job.name])
end
private
def workflow_from_hash(hash, nodes = nil)
- flow = hash[:klass].constantize.new *hash[:arguments]
+ flow = hash[:klass].constantize.new(*hash[:arguments])
flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]
(nodes || hash[:nodes]).each do |node|
- flow.jobs << Gush::Job.from_hash(flow, node)
+ flow.jobs << Gush::Job.from_hash(node)
end
flow
- end
-
- def report(key, message)
- connection_pool.with do |redis|
- redis.publish(key, Gush::JSON.encode(message))
- end
- end
-
-
- def build_sidekiq
- Sidekiq::Client.new(connection_pool)
end
def build_redis
Redis.new(url: configuration.redis_url)
end