lib/taskinator/persistence.rb in taskinator-0.0.17 vs lib/taskinator/persistence.rb in taskinator-0.0.18

- old
+ new

@@ -59,24 +59,40 @@ end end end module InstanceMethods - def key - self.class.key_for(self.uuid) - end def save Taskinator.redis do |conn| conn.multi do - RedisSerializationVisitor.new(conn, self).visit - conn.sadd "taskinator:#{self.class.base_key}", self.uuid + visitor = RedisSerializationVisitor.new(conn, self).visit + conn.hmset( + "taskinator:#{self.key}", + :tasks_count, visitor.task_count, + :tasks_failed, 0, + :tasks_completed, 0, + :tasks_cancelled, 0, + ) true end end end + # this is the persistence key + def key + @key ||= self.class.key_for(self.uuid) + end + + # retrieves the root key associated + # with the process or task + def root_key + @root_key ||= Taskinator.redis do |conn| + conn.hget(self.key, :root_key) + end + end + # retrieves the workflow state # this method is called from the workflow gem def load_workflow_state state = Taskinator.redis do |conn| conn.hget(self.key, :state) @@ -114,52 +130,96 @@ conn.hmget(self.key, :error_type, :error_message, :error_backtrace) [error_type, error_message, JSON.parse(error_backtrace)] end end + + def tasks_count + @tasks_count ||= begin + Taskinator.redis do |conn| + conn.hget "taskinator:#{self.root_key}", :tasks_count + end.to_i + end + end + + %w( + failed + cancelled + completed + ).each do |status| + + define_method "count_#{status}" do + Taskinator.redis do |conn| + conn.hget "taskinator:#{self.root_key}", status + end.to_i + end + + define_method "incr_#{status}" do + Taskinator.redis do |conn| + conn.hincrby "taskinator:#{self.root_key}", status, 1 + end + end + + define_method "percentage_#{status}" do + tasks_count > 0 ? (send("count_#{status}") / tasks_count.to_f) * 100.0 : 0.0 + end + + end + end class RedisSerializationVisitor < Visitor::Base # # the redis connection is passed in since it is # in the multi statement mode in order to produce # one roundtrip to the redis server # - def initialize(conn, instance, parent=nil) - @conn = conn - @instance = instance - @key = instance.key - # @parent = parent # not using this yet + attr_reader :instance + + def initialize(conn, instance, base_visitor=self) + @conn = conn + @instance = instance + @key = instance.key + @root_key = base_visitor.instance.key + @base_visitor = base_visitor + @task_count = 0 end # the starting point for serializing the instance def visit @hmset = [] @hmset << @key + @hmset += [:type, @instance.class.name] @instance.accept(self) + # add the root key, for easy access later! + @hmset += [:root_key, @root_key] + # NB: splat args @conn.hmset(*@hmset) + + self end def visit_process(attribute) process = @instance.send(attribute) if process @hmset += [attribute, process.uuid] - RedisSerializationVisitor.new(@conn, process, @instance).visit + RedisSerializationVisitor.new(@conn, process, @base_visitor).visit end end def visit_tasks(tasks) - @hmset += [:task_count, tasks.count] + @hmset += [:task_count, tasks.count] # not used currently, but for informational purposes tasks.each do |task| - RedisSerializationVisitor.new(@conn, task, @instance).visit + RedisSerializationVisitor.new(@conn, task, @base_visitor).visit @conn.rpush "#{@key}:tasks", task.uuid + @base_visitor.incr_task_count unless task.is_a?(Task::SubProcess) end end def visit_attribute(attribute) value = @instance.send(attribute) @@ -184,10 +244,18 @@ def visit_args(attribute) values = @instance.send(attribute) yaml = Taskinator::Persistence.serialize(values) @hmset += [attribute, yaml] end + + def task_count + @task_count + end + + def incr_task_count + @task_count += 1 + end end class RedisDeserializationVisitor < Taskinator::Visitor::Base # @@ -291,12 +359,14 @@ # arbitrary instance to perform it's work # def lazy_instance_for(base, uuid) Taskinator.redis do |conn| type = conn.hget(base.key_for(uuid), :type) + root_key = conn.hget(base.key_for(uuid), :root_key) + klass = Kernel.const_get(type) - LazyLoader.new(klass, uuid, @instance_cache) + LazyLoader.new(klass, uuid, root_key, @instance_cache) end end end # lazily loads the object specified by the type and uuid @@ -308,17 +378,20 @@ # to keep loading the same objects again. # # E.g. this is useful for tasks which refer to their parent processes # - def initialize(type, uuid, instance_cache={}) + def initialize(type, uuid, root_key, instance_cache={}) @type = type @uuid = uuid + @root_key = root_key @instance_cache = instance_cache end - attr_reader :uuid # shadows the real method, but will be the same! + # shadows the real methods, but will be the same! + attr_reader :uuid + attr_reader :root_key # attempts to reload the actual process def reload @instance = nil __getobj__ @@ -341,10 +414,12 @@ } elsif values.is_a?(Hash) values.each {|key, value| values[key] = value.global_id if value.respond_to?(:global_id) } + elsif values.respond_to?(:global_id) + values = values.global_id end YAML.dump(values) end def deserialize(yaml) @@ -355,9 +430,11 @@ } elsif values.is_a?(Hash) values.each {|key, value| values[key] = value.find if value.respond_to?(:model_id) && value.respond_to?(:find) } + elsif values.respond_to?(:model_id) && values.respond_to?(:find) + values = values.find end values end end