lib/gush/client.rb in gush-2.0.1 vs lib/gush/client.rb in gush-2.0.2

- old
+ new

@@ -1,11 +1,24 @@ -require 'connection_pool' +require 'redis' +require 'concurrent-ruby' module Gush class Client attr_reader :configuration + @@redis_connection = Concurrent::ThreadLocalVar.new(nil) + + def self.redis_connection(config) + cached = (@@redis_connection.value ||= { url: config.redis_url, connection: nil }) + return cached[:connection] if !cached[:connection].nil? && config.redis_url == cached[:url] + + Redis.new(url: config.redis_url).tap do |instance| + RedisClassy.redis = instance + @@redis_connection.value = { url: config.redis_url, connection: instance } + end + end + def initialize(config = Gush.configuration) @configuration = config end def configure @@ -45,13 +58,11 @@ def next_free_job_id(workflow_id, job_klass) job_id = nil loop do job_id = SecureRandom.uuid - available = connection_pool.with do |redis| - !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id) - end + available = !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id) break if available end job_id @@ -59,63 +70,53 @@ def next_free_workflow_id id = nil loop do id = SecureRandom.uuid - available = connection_pool.with do |redis| - !redis.exists("gush.workflow.#{id}") - end + available = !redis.exists("gush.workflow.#{id}") break if available end id end def all_workflows - connection_pool.with do |redis| - redis.scan_each(match: "gush.workflows.*").map do |key| - id = key.sub("gush.workflows.", "") - find_workflow(id) - end + redis.scan_each(match: "gush.workflows.*").map do |key| + id = key.sub("gush.workflows.", "") + find_workflow(id) end end def find_workflow(id) - connection_pool.with do |redis| - data = redis.get("gush.workflows.#{id}") + data = redis.get("gush.workflows.#{id}") - unless data.nil? - hash = Gush::JSON.decode(data, symbolize_keys: true) - keys = redis.scan_each(match: "gush.jobs.#{id}.*") + unless data.nil? + hash = Gush::JSON.decode(data, symbolize_keys: true) + keys = redis.scan_each(match: "gush.jobs.#{id}.*") - nodes = keys.each_with_object([]) do |key, array| - array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } - end - - workflow_from_hash(hash, nodes) - else - raise WorkflowNotFound.new("Workflow with given id doesn't exist") + nodes = keys.each_with_object([]) do |key, array| + array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } end + + workflow_from_hash(hash, nodes) + else + raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end def persist_workflow(workflow) - connection_pool.with do |redis| - redis.set("gush.workflows.#{workflow.id}", workflow.to_json) - end + redis.set("gush.workflows.#{workflow.id}", workflow.to_json) workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end def persist_job(workflow_id, job) - connection_pool.with do |redis| - redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json) - end + redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json) end def find_job(workflow_id, job_name) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name) @@ -130,35 +131,27 @@ data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(data) end def destroy_workflow(workflow) - connection_pool.with do |redis| - redis.del("gush.workflows.#{workflow.id}") - end + redis.del("gush.workflows.#{workflow.id}") workflow.jobs.each {|job| destroy_job(workflow.id, job) } end def destroy_job(workflow_id, job) - connection_pool.with do |redis| - redis.del("gush.jobs.#{workflow_id}.#{job.klass}") - end + redis.del("gush.jobs.#{workflow_id}.#{job.klass}") end def expire_workflow(workflow, ttl=nil) ttl = ttl || configuration.ttl - connection_pool.with do |redis| - redis.expire("gush.workflows.#{workflow.id}", ttl) - end + redis.expire("gush.workflows.#{workflow.id}", ttl) workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) } end def expire_job(workflow_id, job, ttl=nil) ttl = ttl || configuration.ttl - connection_pool.with do |redis| - redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl) - end + redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl) end def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) @@ -170,20 +163,15 @@ private def find_job_by_klass_and_id(workflow_id, job_name) job_klass, job_id = job_name.split('|') - connection_pool.with do |redis| - redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id) - end + redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id) end def find_job_by_klass(workflow_id, job_name) - new_cursor, result = connection_pool.with do |redis| - redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1) - end - + new_cursor, result = redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1) return nil if result.empty? job_id, job = *result[0] job @@ -200,16 +188,10 @@ end flow end - def build_redis - Redis.new(url: configuration.redis_url).tap do |instance| - RedisClassy.redis = instance - end - end - - def connection_pool - @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis } + def redis + self.class.redis_connection(configuration) end end end