lib/taskinator/persistence.rb in taskinator-0.2.0 vs lib/taskinator/persistence.rb in taskinator-0.3.0
- old
+ new
@@ -33,21 +33,10 @@
# returns the storage key for the given identifier
def key_for(uuid)
"taskinator:#{base_key}:#{uuid}"
end
- # retrieves the workflow state for the given identifier
- # this prevents to need to load the entire object when
- # querying for the status of an instance
- def state_for(uuid)
- key = key_for(uuid)
- state = Taskinator.redis do |conn|
- conn.hget(key, :state) || 'initial'
- end
- state.to_sym
- end
-
# fetches the instance for given identifier
# optionally, provide a hash to use for the instance cache
# this argument is defaulted, so top level callers don't
# need to provide this.
def fetch(uuid, instance_cache={})
@@ -66,14 +55,15 @@
Taskinator.redis do |conn|
conn.multi do
visitor = RedisSerializationVisitor.new(conn, self).visit
conn.hmset(
Taskinator::Process.key_for(uuid),
- :tasks_count, visitor.task_count,
- :tasks_failed, 0,
- :tasks_completed, 0,
- :tasks_cancelled, 0,
+ :tasks_count, visitor.task_count,
+ :tasks_failed, 0,
+ :tasks_processing, 0,
+ :tasks_completed, 0,
+ :tasks_cancelled, 0,
)
true
end
end
end
@@ -97,21 +87,36 @@
# retrieves the workflow state
# this method is called from the workflow gem
def load_workflow_state
state = Taskinator.redis do |conn|
- conn.hget(self.key, :state)
+ conn.hget(self.key, :state) || 'initial'
end
- (state || 'initial').to_sym
+ state.to_sym
end
# persists the workflow state
# this method is called from the workflow gem
def persist_workflow_state(new_state)
+ @updated_at = Time.now.utc
Taskinator.redis do |conn|
- conn.hset(self.key, :state, new_state)
+ process_key = self.process_key
+ conn.multi do
+ conn.hmset(
+ self.key,
+ :state, new_state,
+ :updated_at, @updated_at
+ )
+
+ # also update the "root" process
+ conn.hset(
+ process_key,
+ :updated_at, @updated_at
+ )
+ end
end
+ new_state
end
# persists the error information
def fail(error=nil)
return unless error && error.is_a?(Exception)
@@ -119,11 +124,12 @@
Taskinator.redis do |conn|
conn.hmset(
self.key,
:error_type, error.class.name,
:error_message, error.message,
- :error_backtrace, JSON.generate(error.backtrace || [])
+ :error_backtrace, JSON.generate(error.backtrace || []),
+ :updated_at, Time.now.utc
)
end
end
# retrieves the error type, message and backtrace
@@ -131,11 +137,11 @@
def error
@error ||= Taskinator.redis do |conn|
error_type, error_message, error_backtrace =
conn.hmget(self.key, :error_type, :error_message, :error_backtrace)
- [error_type, error_message, JSON.parse(error_backtrace)]
+ [error_type, error_message, JSON.parse(error_backtrace || '[]')]
end
end
def tasks_count
@tasks_count ||= begin
@@ -147,23 +153,28 @@
end
%w(
failed
cancelled
+ processing
completed
).each do |status|
define_method "count_#{status}" do
count = Taskinator.redis do |conn|
- conn.hget self.process_key, status
+ conn.hget self.process_key, "tasks_#{status}"
end
count.to_i
end
define_method "incr_#{status}" do
Taskinator.redis do |conn|
- conn.hincrby self.process_key, status, 1
+ process_key = self.process_key
+ conn.multi do
+ conn.hincrby process_key, "tasks_#{status}", 1
+ conn.hset process_key, :updated_at, Time.now.utc
+ end
end
end
define_method "percentage_#{status}" do
tasks_count > 0 ? (send("count_#{status}") / tasks_count.to_f) * 100.0 : 0.0
@@ -181,38 +192,10 @@
end
yaml ? Taskinator::Persistence.deserialize(yaml) : {}
end
end
- # prepairs the meta data for instrumentation events
- def instrumentation_payload(additional={})
-
- # need to cache here, since this method hits redis, so can't be part of multi statement following
- process_key = self.process_key
-
- tasks_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn|
- conn.hmget process_key, :tasks_count, :completed, :cancelled, :failed
- end
-
- tasks_count = tasks_count.to_f
- completed_percent = tasks_count > 0 ? (completed_count.to_i / tasks_count) * 100.0 : 0.0
- cancelled_percent = tasks_count > 0 ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0
- failed_percent = tasks_count > 0 ? (failed_count.to_i / tasks_count) * 100.0 : 0.0
-
- return {
- :type => self.class.name,
- :process_uuid => process_uuid,
- :process_options => process_options,
- :uuid => uuid,
- :percentage_failed => failed_percent,
- :percentage_cancelled => cancelled_percent,
- :percentage_completed => completed_percent,
- :tasks_count => tasks_count
- }.merge(additional)
-
- end
-
end
class RedisSerializationVisitor < Visitor::Base
#
@@ -242,10 +225,13 @@
@instance.accept(self)
# add the process uuid and root key, for easy access later!
@hmset += [:process_uuid, @root.uuid]
+ # add the default state
+ @hmset += [:state, :initial]
+
# NB: splat args
@conn.hmset(*@hmset)
self
end
@@ -257,11 +243,10 @@
RedisSerializationVisitor.new(@conn, process, @base_visitor).visit
end
end
def visit_tasks(tasks)
- @hmset += [:task_count, tasks.count] # not used currently, but for informational purposes
tasks.each do |task|
RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
@conn.rpush "#{@key}:tasks", task.uuid
@base_visitor.incr_task_count unless task.is_a?(Task::SubProcess)
end
@@ -270,10 +255,18 @@
def visit_attribute(attribute)
value = @instance.send(attribute)
@hmset += [attribute, value] if value
end
+ def visit_attribute_time(attribute)
+ visit_attribute(attribute)
+ end
+
+ def visit_attribute_enum(attribute, type)
+ visit_attribute(attribute)
+ end
+
def visit_process_reference(attribute)
process = @instance.send(attribute)
@hmset += [attribute, process.uuid] if process
end
@@ -372,13 +365,36 @@
@instance.instance_variable_set("@#{attribute}", lazy_instance_for(Task, uuid)) if uuid
end
def visit_attribute(attribute)
value = @attribute_values[attribute]
- @instance.instance_variable_set("@#{attribute}", value) if value
+ if value
+ # converted block given?
+ if block_given?
+ @instance.instance_variable_set("@#{attribute}", yield(value))
+ else
+ @instance.instance_variable_set("@#{attribute}", value)
+ end
+ end
end
+ def visit_attribute_time(attribute)
+ visit_attribute(attribute) do |value|
+ Time.parse(value)
+ end
+ end
+
+ # NB: assumes the enum type's members have integer values!
+ def visit_attribute_enum(attribute, type)
+ visit_attribute(attribute) do |value|
+ const_value = type.constants.select {|c| type.const_get(c) == value.to_i }.first
+ const_value ?
+ type.const_get(const_value) :
+ (defined?(type::Default) ? type::Default : nil)
+ end
+ end
+
def visit_type(attribute)
value = @attribute_values[attribute]
if value
type = Kernel.const_get(value)
@instance.instance_variable_set("@#{attribute}", type)
@@ -403,15 +419,15 @@
# time, but also prevents need to load the entire
# object graph everytime a worker fetches an
# arbitrary instance to perform it's work
#
def lazy_instance_for(base, uuid)
- type, process_uuid = Taskinator.redis do |conn|
- conn.hmget(base.key_for(uuid), :type, :process_uuid)
+ type = Taskinator.redis do |conn|
+ conn.hget(base.key_for(uuid), :type)
end
klass = Kernel.const_get(type)
- LazyLoader.new(klass, uuid, process_uuid, @instance_cache)
+ LazyLoader.new(klass, uuid, @instance_cache)
end
end
# lazily loads the object specified by the type and uuid
class LazyLoader < Delegator
@@ -422,19 +438,14 @@
# to keep loading the same objects again.
#
# E.g. this is useful for tasks which refer to their parent processes
#
- def initialize(type, uuid, process_uuid, instance_cache={})
+ def initialize(type, uuid, instance_cache={})
@type = type
@uuid = uuid
- @process_uuid = process_uuid
@instance_cache = instance_cache
end
-
- # shadows the real methods, but will be the same!
- attr_reader :process_uuid
- attr_reader :uuid
def __getobj__
# only fetch the object as needed
# and memoize for subsequent calls
@instance ||= @type.fetch(@uuid, @instance_cache)