lib/taskinator/persistence.rb in taskinator-0.2.0 vs lib/taskinator/persistence.rb in taskinator-0.3.0

- old
+ new

@@ -33,21 +33,10 @@ # returns the storage key for the given identifier def key_for(uuid) "taskinator:#{base_key}:#{uuid}" end - # retrieves the workflow state for the given identifier - # this prevents to need to load the entire object when - # querying for the status of an instance - def state_for(uuid) - key = key_for(uuid) - state = Taskinator.redis do |conn| - conn.hget(key, :state) || 'initial' - end - state.to_sym - end - # fetches the instance for given identifier # optionally, provide a hash to use for the instance cache # this argument is defaulted, so top level callers don't # need to provide this. def fetch(uuid, instance_cache={}) @@ -66,14 +55,15 @@ Taskinator.redis do |conn| conn.multi do visitor = RedisSerializationVisitor.new(conn, self).visit conn.hmset( Taskinator::Process.key_for(uuid), - :tasks_count, visitor.task_count, - :tasks_failed, 0, - :tasks_completed, 0, - :tasks_cancelled, 0, + :tasks_count, visitor.task_count, + :tasks_failed, 0, + :tasks_processing, 0, + :tasks_completed, 0, + :tasks_cancelled, 0, ) true end end end @@ -97,21 +87,36 @@ # retrieves the workflow state # this method is called from the workflow gem def load_workflow_state state = Taskinator.redis do |conn| - conn.hget(self.key, :state) + conn.hget(self.key, :state) || 'initial' end - (state || 'initial').to_sym + state.to_sym end # persists the workflow state # this method is called from the workflow gem def persist_workflow_state(new_state) + @updated_at = Time.now.utc Taskinator.redis do |conn| - conn.hset(self.key, :state, new_state) + process_key = self.process_key + conn.multi do + conn.hmset( + self.key, + :state, new_state, + :updated_at, @updated_at + ) + + # also update the "root" process + conn.hset( + process_key, + :updated_at, @updated_at + ) + end end + new_state end # persists the error information def fail(error=nil) return unless error && error.is_a?(Exception) @@ -119,11 +124,12 @@ Taskinator.redis do |conn| conn.hmset( self.key, :error_type, error.class.name, :error_message, error.message, - :error_backtrace, JSON.generate(error.backtrace || []) + :error_backtrace, JSON.generate(error.backtrace || []), + :updated_at, Time.now.utc ) end end # retrieves the error type, message and backtrace @@ -131,11 +137,11 @@ def error @error ||= Taskinator.redis do |conn| error_type, error_message, error_backtrace = conn.hmget(self.key, :error_type, :error_message, :error_backtrace) - [error_type, error_message, JSON.parse(error_backtrace)] + [error_type, error_message, JSON.parse(error_backtrace || '[]')] end end def tasks_count @tasks_count ||= begin @@ -147,23 +153,28 @@ end %w( failed cancelled + processing completed ).each do |status| define_method "count_#{status}" do count = Taskinator.redis do |conn| - conn.hget self.process_key, status + conn.hget self.process_key, "tasks_#{status}" end count.to_i end define_method "incr_#{status}" do Taskinator.redis do |conn| - conn.hincrby self.process_key, status, 1 + process_key = self.process_key + conn.multi do + conn.hincrby process_key, "tasks_#{status}", 1 + conn.hset process_key, :updated_at, Time.now.utc + end end end define_method "percentage_#{status}" do tasks_count > 0 ? (send("count_#{status}") / tasks_count.to_f) * 100.0 : 0.0 @@ -181,38 +192,10 @@ end yaml ? Taskinator::Persistence.deserialize(yaml) : {} end end - # prepairs the meta data for instrumentation events - def instrumentation_payload(additional={}) - - # need to cache here, since this method hits redis, so can't be part of multi statement following - process_key = self.process_key - - tasks_count, completed_count, cancelled_count, failed_count = Taskinator.redis do |conn| - conn.hmget process_key, :tasks_count, :completed, :cancelled, :failed - end - - tasks_count = tasks_count.to_f - completed_percent = tasks_count > 0 ? (completed_count.to_i / tasks_count) * 100.0 : 0.0 - cancelled_percent = tasks_count > 0 ? (cancelled_count.to_i / tasks_count) * 100.0 : 0.0 - failed_percent = tasks_count > 0 ? (failed_count.to_i / tasks_count) * 100.0 : 0.0 - - return { - :type => self.class.name, - :process_uuid => process_uuid, - :process_options => process_options, - :uuid => uuid, - :percentage_failed => failed_percent, - :percentage_cancelled => cancelled_percent, - :percentage_completed => completed_percent, - :tasks_count => tasks_count - }.merge(additional) - - end - end class RedisSerializationVisitor < Visitor::Base # @@ -242,10 +225,13 @@ @instance.accept(self) # add the process uuid and root key, for easy access later! @hmset += [:process_uuid, @root.uuid] + # add the default state + @hmset += [:state, :initial] + # NB: splat args @conn.hmset(*@hmset) self end @@ -257,11 +243,10 @@ RedisSerializationVisitor.new(@conn, process, @base_visitor).visit end end def visit_tasks(tasks) - @hmset += [:task_count, tasks.count] # not used currently, but for informational purposes tasks.each do |task| RedisSerializationVisitor.new(@conn, task, @base_visitor).visit @conn.rpush "#{@key}:tasks", task.uuid @base_visitor.incr_task_count unless task.is_a?(Task::SubProcess) end @@ -270,10 +255,18 @@ def visit_attribute(attribute) value = @instance.send(attribute) @hmset += [attribute, value] if value end + def visit_attribute_time(attribute) + visit_attribute(attribute) + end + + def visit_attribute_enum(attribute, type) + visit_attribute(attribute) + end + def visit_process_reference(attribute) process = @instance.send(attribute) @hmset += [attribute, process.uuid] if process end @@ -372,13 +365,36 @@ @instance.instance_variable_set("@#{attribute}", lazy_instance_for(Task, uuid)) if uuid end def visit_attribute(attribute) value = @attribute_values[attribute] - @instance.instance_variable_set("@#{attribute}", value) if value + if value + # converted block given? + if block_given? + @instance.instance_variable_set("@#{attribute}", yield(value)) + else + @instance.instance_variable_set("@#{attribute}", value) + end + end end + def visit_attribute_time(attribute) + visit_attribute(attribute) do |value| + Time.parse(value) + end + end + + # NB: assumes the enum type's members have integer values! + def visit_attribute_enum(attribute, type) + visit_attribute(attribute) do |value| + const_value = type.constants.select {|c| type.const_get(c) == value.to_i }.first + const_value ? + type.const_get(const_value) : + (defined?(type::Default) ? type::Default : nil) + end + end + def visit_type(attribute) value = @attribute_values[attribute] if value type = Kernel.const_get(value) @instance.instance_variable_set("@#{attribute}", type) @@ -403,15 +419,15 @@ # time, but also prevents need to load the entire # object graph everytime a worker fetches an # arbitrary instance to perform it's work # def lazy_instance_for(base, uuid) - type, process_uuid = Taskinator.redis do |conn| - conn.hmget(base.key_for(uuid), :type, :process_uuid) + type = Taskinator.redis do |conn| + conn.hget(base.key_for(uuid), :type) end klass = Kernel.const_get(type) - LazyLoader.new(klass, uuid, process_uuid, @instance_cache) + LazyLoader.new(klass, uuid, @instance_cache) end end # lazily loads the object specified by the type and uuid class LazyLoader < Delegator @@ -422,19 +438,14 @@ # to keep loading the same objects again. # # E.g. this is useful for tasks which refer to their parent processes # - def initialize(type, uuid, process_uuid, instance_cache={}) + def initialize(type, uuid, instance_cache={}) @type = type @uuid = uuid - @process_uuid = process_uuid @instance_cache = instance_cache end - - # shadows the real methods, but will be the same! - attr_reader :process_uuid - attr_reader :uuid def __getobj__ # only fetch the object as needed # and memoize for subsequent calls @instance ||= @type.fetch(@uuid, @instance_cache)