lib/gush/client.rb in gush-3.0.0 vs lib/gush/client.rb in gush-4.0.0
- old
+ new
@@ -70,18 +70,53 @@
def next_free_workflow_id
id = nil
loop do
id = SecureRandom.uuid
- available = !redis.exists?("gush.workflow.#{id}")
+ available = !redis.exists?("gush.workflows.#{id}")
break if available
end
id
end
+ # Returns the specified range of workflow ids, sorted by created timestamp.
+ #
+ # @param start, stop [Integer] see https://redis.io/docs/latest/commands/zrange/#index-ranges
+ # for details on the start and stop parameters.
+ # @param by_ts [Boolean] if true, start and stop are treated as timestamps
+ # rather than as element indexes, which allows the workflows to be indexed
+ # by created timestamp
+ # @param order [Symbol] if :asc, finds ids in ascending created timestamp;
+ # if :desc, finds ids in descending created timestamp
+ # @returns [Array<String>] array of workflow ids
+ def workflow_ids(start=nil, stop=nil, by_ts: false, order: :asc)
+ start ||= 0
+ stop ||= 99
+
+ redis.zrange(
+ "gush.idx.workflows.created_at",
+ start,
+ stop,
+ by_score: by_ts,
+ rev: order&.to_sym == :desc
+ )
+ end
+
+ def workflows(start=nil, stop=nil, **kwargs)
+ workflow_ids(start, stop, **kwargs).map { |id| find_workflow(id) }
+ end
+
+ def workflows_count
+ redis.zcard('gush.idx.workflows.created_at')
+ end
+
+ # Deprecated.
+ #
+ # This method is not performant when there are a large number of workflows
+ # or when the redis keyspace is large. Use workflows instead with pagination.
def all_workflows
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
find_workflow(id)
end
@@ -90,32 +125,48 @@
def find_workflow(id)
data = redis.get("gush.workflows.#{id}")
unless data.nil?
hash = Gush::JSON.decode(data, symbolize_keys: true)
- keys = redis.scan_each(match: "gush.jobs.#{id}.*")
+ if hash[:job_klasses]
+ keys = hash[:job_klasses].map { |klass| "gush.jobs.#{id}.#{klass}" }
+ else
+ # For backwards compatibility, get job keys via a full keyspace scan
+ keys = redis.scan_each(match: "gush.jobs.#{id}.*")
+ end
+
nodes = keys.each_with_object([]) do |key, array|
- array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
+ array.concat(redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) })
end
workflow_from_hash(hash, nodes)
else
raise WorkflowNotFound.new("Workflow with given id doesn't exist")
end
end
def persist_workflow(workflow)
+ created_at = Time.now.to_f
+ added = redis.zadd("gush.idx.workflows.created_at", created_at, workflow.id, nx: true)
+
+ if added && configuration.ttl&.positive?
+ expires_at = created_at + configuration.ttl
+ redis.zadd("gush.idx.workflows.expires_at", expires_at, workflow.id, nx: true)
+ end
+
redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
- workflow.jobs.each {|job| persist_job(workflow.id, job) }
+ workflow.jobs.each {|job| persist_job(workflow.id, job, expires_at: expires_at) }
workflow.mark_as_persisted
true
end
- def persist_job(workflow_id, job)
+ def persist_job(workflow_id, job, expires_at: nil)
+ redis.zadd("gush.idx.jobs.expires_at", expires_at, "#{workflow_id}.#{job.klass}", nx: true) if expires_at
+
redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end
def find_job(workflow_id, job_name)
job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name)
@@ -132,39 +183,71 @@
Gush::Job.from_hash(data)
end
def destroy_workflow(workflow)
redis.del("gush.workflows.#{workflow.id}")
+ redis.zrem("gush.idx.workflows.created_at", workflow.id)
+ redis.zrem("gush.idx.workflows.expires_at", workflow.id)
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end
def destroy_job(workflow_id, job)
redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
+ redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
end
+ def expire_workflows(expires_at=nil)
+ expires_at ||= Time.now.to_f
+
+ ids = redis.zrange("gush.idx.workflows.expires_at", "-inf", expires_at, by_score: true)
+ return if ids.empty?
+
+ redis.del(ids.map { |id| "gush.workflows.#{id}" })
+ redis.zrem("gush.idx.workflows.created_at", ids)
+ redis.zrem("gush.idx.workflows.expires_at", ids)
+
+ expire_jobs(expires_at)
+ end
+
+ def expire_jobs(expires_at=nil)
+ expires_at ||= Time.now.to_f
+
+ keys = redis.zrange("gush.idx.jobs.expires_at", "-inf", expires_at, by_score: true)
+ return if keys.empty?
+
+ redis.del(keys.map { |key| "gush.jobs.#{key}" })
+ redis.zrem("gush.idx.jobs.expires_at", keys)
+ end
+
def expire_workflow(workflow, ttl=nil)
- ttl = ttl || configuration.ttl
- redis.expire("gush.workflows.#{workflow.id}", ttl)
+ ttl ||= configuration.ttl
+
+ if ttl&.positive?
+ redis.zadd("gush.idx.workflows.expires_at", Time.now.to_f + ttl, workflow.id)
+ else
+ redis.zrem("gush.idx.workflows.expires_at", workflow.id)
+ end
+
workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end
def expire_job(workflow_id, job, ttl=nil)
- ttl = ttl || configuration.ttl
- redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl)
+ ttl ||= configuration.ttl
+
+ if ttl&.positive?
+ redis.zadd("gush.idx.jobs.expires_at", Time.now.to_f + ttl, "#{workflow_id}.#{job.klass}")
+ else
+ redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
+ end
end
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
- queue = job.queue || configuration.namespace
- wait = job.wait
-
- if wait.present?
- Gush::Worker.set(queue: queue, wait: wait).perform_later(*[workflow_id, job.name])
- else
- Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
- end
+
+ options = { queue: configuration.namespace }.merge(job.worker_options)
+ job.enqueue_worker!(options)
end
private
def find_job_by_klass_and_id(workflow_id, job_name)
@@ -181,19 +264,28 @@
job
end
def workflow_from_hash(hash, nodes = [])
- flow = hash[:klass].constantize.new(*hash[:arguments])
- flow.jobs = []
- flow.stopped = hash.fetch(:stopped, false)
- flow.id = hash[:id]
-
- flow.jobs = nodes.map do |node|
+ jobs = nodes.map do |node|
Gush::Job.from_hash(node)
end
- flow
+ internal_state = {
+ persisted: true,
+ jobs: jobs,
+ # For backwards compatibility, setup can only be skipped for a persisted
+ # workflow if there is no data missing from the persistence.
+ # 2024-07-23: dependencies added to persistence
+ skip_setup: !hash[:dependencies].nil?
+ }.merge(hash)
+
+ hash[:klass].constantize.new(
+ *hash[:arguments],
+ **hash[:kwargs],
+ globals: hash[:globals],
+ internal_state: internal_state
+ )
end
def redis
self.class.redis_connection(configuration)
end