lib/gush/client.rb in gush-0.3.3 vs lib/gush/client.rb in gush-0.4
- old
+ new
@@ -1,17 +1,19 @@
module Gush
class Client
- attr_reader :configuration, :sidekiq
+ attr_reader :configuration
def initialize(config = Gush.configuration)
@configuration = config
@sidekiq = build_sidekiq
+ @redis = build_redis
end
def configure
yield configuration
@sidekiq = build_sidekiq
+ @redis = build_redis
end
def create_workflow(name)
begin
name.constantize.create
@@ -45,106 +47,78 @@
def next_free_job_id(workflow_id,job_klass)
job_identifier = nil
loop do
id = SecureRandom.uuid
job_identifier = "#{job_klass}-#{id}"
- available = connection_pool.with do |redis|
- !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}")
- end
-
- break if available
+ break if !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}")
end
job_identifier
end
def next_free_workflow_id
id = nil
loop do
id = SecureRandom.uuid
- available = connection_pool.with do |redis|
- !redis.exists("gush.workflow.#{id}")
- end
-
- break if available
+ break if !redis.exists("gush.workflow.#{id}")
end
id
end
def all_workflows
- connection_pool.with do |redis|
- redis.keys("gush.workflows.*").map do |key|
- id = key.sub("gush.workflows.", "")
- find_workflow(id)
- end
+ redis.keys("gush.workflows.*").map do |key|
+ id = key.sub("gush.workflows.", "")
+ find_workflow(id)
end
end
def find_workflow(id)
- connection_pool.with do |redis|
- data = redis.get("gush.workflows.#{id}")
-
- unless data.nil?
- hash = Gush::JSON.decode(data, symbolize_keys: true)
- keys = redis.keys("gush.jobs.#{id}.*")
- nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
- workflow_from_hash(hash, nodes)
- else
- raise WorkflowNotFound.new("Workflow with given id doesn't exist")
- end
+ data = redis.get("gush.workflows.#{id}")
+ unless data.nil?
+ hash = Gush::JSON.decode(data, symbolize_keys: true)
+ keys = redis.keys("gush.jobs.#{id}.*")
+ nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
+ workflow_from_hash(hash, nodes)
+ else
+ raise WorkflowNotFound.new("Workflow with given id doesn't exist")
end
end
def persist_workflow(workflow)
- connection_pool.with do |redis|
- redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
- end
-
+ redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
workflow.jobs.each {|job| persist_job(workflow.id, job) }
workflow.mark_as_persisted
true
end
def persist_job(workflow_id, job)
- connection_pool.with do |redis|
- redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json)
- end
+ redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json)
end
def load_job(workflow_id, job_id)
workflow = find_workflow(workflow_id)
job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id)
hypen = '-' if job_name_match.nil?
- keys = connection_pool.with do |redis|
- redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
- end
-
+ keys = redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
return nil if keys.nil?
- data = connection_pool.with do |redis|
- redis.get(keys.first)
- end
-
+ data = redis.get(keys.first)
return nil if data.nil?
data = Gush::JSON.decode(data, symbolize_keys: true)
Gush::Job.from_hash(workflow, data)
end
def destroy_workflow(workflow)
- connection_pool.with do |redis|
- redis.del("gush.workflows.#{workflow.id}")
- end
+ redis.del("gush.workflows.#{workflow.id}")
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
def destroy_job(workflow_id, job)
- connection_pool.with do |redis|
- redis.del("gush.jobs.#{workflow_id}.#{job.name}")
- end
+ redis.del("gush.jobs.#{workflow_id}.#{job.name}")
end
def worker_report(message)
report("gush.workers.status", message)
end
@@ -164,10 +138,12 @@
)
end
private
+ attr_reader :sidekiq, :redis
+
def workflow_from_hash(hash, nodes = nil)
flow = hash[:klass].constantize.new *hash[:arguments]
flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]
@@ -178,13 +154,11 @@
flow
end
def report(key, message)
- connection_pool.with do |redis|
- redis.publish(key, Gush::JSON.encode(message))
- end
+ redis.publish(key, Gush::JSON.encode(message))
end
def build_sidekiq
Sidekiq::Client.new(connection_pool)
@@ -193,9 +167,9 @@
def build_redis
Redis.new(url: configuration.redis_url)
end
def connection_pool
- @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis }
+ ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis }
end
end
end