lib/gush/client.rb in gush-2.0.1 vs lib/gush/client.rb in gush-2.0.2
- old
+ new
@@ -1,11 +1,24 @@
-require 'connection_pool'
+require 'redis'
+require 'concurrent-ruby'
module Gush
class Client
attr_reader :configuration
+ @@redis_connection = Concurrent::ThreadLocalVar.new(nil)
+
+ def self.redis_connection(config)
+ cached = (@@redis_connection.value ||= { url: config.redis_url, connection: nil })
+ return cached[:connection] if !cached[:connection].nil? && config.redis_url == cached[:url]
+
+ Redis.new(url: config.redis_url).tap do |instance|
+ RedisClassy.redis = instance
+ @@redis_connection.value = { url: config.redis_url, connection: instance }
+ end
+ end
+
def initialize(config = Gush.configuration)
@configuration = config
end
def configure
@@ -45,13 +58,11 @@
def next_free_job_id(workflow_id, job_klass)
job_id = nil
loop do
job_id = SecureRandom.uuid
- available = connection_pool.with do |redis|
- !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
- end
+ available = !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
break if available
end
job_id
@@ -59,63 +70,53 @@
def next_free_workflow_id
id = nil
loop do
id = SecureRandom.uuid
- available = connection_pool.with do |redis|
- !redis.exists("gush.workflow.#{id}")
- end
+ available = !redis.exists("gush.workflow.#{id}")
break if available
end
id
end
def all_workflows
- connection_pool.with do |redis|
- redis.scan_each(match: "gush.workflows.*").map do |key|
- id = key.sub("gush.workflows.", "")
- find_workflow(id)
- end
+ redis.scan_each(match: "gush.workflows.*").map do |key|
+ id = key.sub("gush.workflows.", "")
+ find_workflow(id)
end
end
def find_workflow(id)
- connection_pool.with do |redis|
- data = redis.get("gush.workflows.#{id}")
+ data = redis.get("gush.workflows.#{id}")
- unless data.nil?
- hash = Gush::JSON.decode(data, symbolize_keys: true)
- keys = redis.scan_each(match: "gush.jobs.#{id}.*")
+ unless data.nil?
+ hash = Gush::JSON.decode(data, symbolize_keys: true)
+ keys = redis.scan_each(match: "gush.jobs.#{id}.*")
- nodes = keys.each_with_object([]) do |key, array|
- array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
- end
-
- workflow_from_hash(hash, nodes)
- else
- raise WorkflowNotFound.new("Workflow with given id doesn't exist")
+ nodes = keys.each_with_object([]) do |key, array|
+ array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
end
+
+ workflow_from_hash(hash, nodes)
+ else
+ raise WorkflowNotFound.new("Workflow with given id doesn't exist")
end
end
def persist_workflow(workflow)
- connection_pool.with do |redis|
- redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
- end
+ redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
workflow.jobs.each {|job| persist_job(workflow.id, job) }
workflow.mark_as_persisted
true
end
def persist_job(workflow_id, job)
- connection_pool.with do |redis|
- redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
- end
+ redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end
def find_job(workflow_id, job_name)
job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name)
@@ -130,35 +131,27 @@
data = Gush::JSON.decode(data, symbolize_keys: true)
Gush::Job.from_hash(data)
end
def destroy_workflow(workflow)
- connection_pool.with do |redis|
- redis.del("gush.workflows.#{workflow.id}")
- end
+ redis.del("gush.workflows.#{workflow.id}")
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
def destroy_job(workflow_id, job)
- connection_pool.with do |redis|
- redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
- end
+ redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
end
def expire_workflow(workflow, ttl=nil)
ttl = ttl || configuration.ttl
- connection_pool.with do |redis|
- redis.expire("gush.workflows.#{workflow.id}", ttl)
- end
+ redis.expire("gush.workflows.#{workflow.id}", ttl)
workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end
def expire_job(workflow_id, job, ttl=nil)
ttl = ttl || configuration.ttl
- connection_pool.with do |redis|
- redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl)
- end
+ redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl)
end
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
@@ -170,20 +163,15 @@
private
def find_job_by_klass_and_id(workflow_id, job_name)
job_klass, job_id = job_name.split('|')
- connection_pool.with do |redis|
- redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
- end
+ redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
end
def find_job_by_klass(workflow_id, job_name)
- new_cursor, result = connection_pool.with do |redis|
- redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1)
- end
-
+ new_cursor, result = redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1)
return nil if result.empty?
job_id, job = *result[0]
job
@@ -200,16 +188,10 @@
end
flow
end
- def build_redis
- Redis.new(url: configuration.redis_url).tap do |instance|
- RedisClassy.redis = instance
- end
- end
-
- def connection_pool
- @connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis }
+ def redis
+ self.class.redis_connection(configuration)
end
end
end