lib/gush/client.rb in gush-0.3.2 vs lib/gush/client.rb in gush-0.3.3

- old
+ new

@@ -1,19 +1,17 @@ module Gush class Client - attr_reader :configuration + attr_reader :configuration, :sidekiq def initialize(config = Gush.configuration) @configuration = config @sidekiq = build_sidekiq - @redis = build_redis end def configure yield configuration @sidekiq = build_sidekiq - @redis = build_redis end def create_workflow(name) begin name.constantize.create @@ -47,78 +45,106 @@ def next_free_job_id(workflow_id,job_klass) job_identifier = nil loop do id = SecureRandom.uuid job_identifier = "#{job_klass}-#{id}" - break if !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}") + available = connection_pool.with do |redis| + !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}") + end + + break if available end job_identifier end def next_free_workflow_id id = nil loop do id = SecureRandom.uuid - break if !redis.exists("gush.workflow.#{id}") + available = connection_pool.with do |redis| + !redis.exists("gush.workflow.#{id}") + end + + break if available end id end def all_workflows - redis.keys("gush.workflows.*").map do |key| - id = key.sub("gush.workflows.", "") - find_workflow(id) + connection_pool.with do |redis| + redis.keys("gush.workflows.*").map do |key| + id = key.sub("gush.workflows.", "") + find_workflow(id) + end end end def find_workflow(id) - data = redis.get("gush.workflows.#{id}") - unless data.nil? - hash = Gush::JSON.decode(data, symbolize_keys: true) - keys = redis.keys("gush.jobs.#{id}.*") - nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } - workflow_from_hash(hash, nodes) - else - raise WorkflowNotFound.new("Workflow with given id doesn't exist") + connection_pool.with do |redis| + data = redis.get("gush.workflows.#{id}") + + unless data.nil? + hash = Gush::JSON.decode(data, symbolize_keys: true) + keys = redis.keys("gush.jobs.#{id}.*") + nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } + workflow_from_hash(hash, nodes) + else + raise WorkflowNotFound.new("Workflow with given id doesn't exist") + end end end def persist_workflow(workflow) - redis.set("gush.workflows.#{workflow.id}", workflow.to_json) + connection_pool.with do |redis| + redis.set("gush.workflows.#{workflow.id}", workflow.to_json) + end + workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end def persist_job(workflow_id, job) - redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json) + connection_pool.with do |redis| + redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json) + end end def load_job(workflow_id, job_id) workflow = find_workflow(workflow_id) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id) hypen = '-' if job_name_match.nil? - keys = redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") + keys = connection_pool.with do |redis| + redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") + end + return nil if keys.nil? - data = redis.get(keys.first) + data = connection_pool.with do |redis| + redis.get(keys.first) + end + return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(workflow, data) end def destroy_workflow(workflow) - redis.del("gush.workflows.#{workflow.id}") + connection_pool.with do |redis| + redis.del("gush.workflows.#{workflow.id}") + end workflow.jobs.each {|job| destroy_job(workflow.id, job) } end def destroy_job(workflow_id, job) - redis.del("gush.jobs.#{workflow_id}.#{job.name}") + connection_pool.with do |redis| + redis.del("gush.jobs.#{workflow_id}.#{job.name}") + end end def worker_report(message) report("gush.workers.status", message) end @@ -138,12 +164,10 @@ ) end private - attr_reader :sidekiq, :redis - def workflow_from_hash(hash, nodes = nil) flow = hash[:klass].constantize.new *hash[:arguments] flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] @@ -154,11 +178,13 @@ flow end def report(key, message) - redis.publish(key, Gush::JSON.encode(message)) + connection_pool.with do |redis| + redis.publish(key, Gush::JSON.encode(message)) + end end def build_sidekiq Sidekiq::Client.new(connection_pool) @@ -167,9 +193,9 @@ def build_redis Redis.new(url: configuration.redis_url) end def connection_pool - ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis } + @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis } end end end