lib/taskinator/persistence.rb in taskinator-0.0.17 vs lib/taskinator/persistence.rb in taskinator-0.0.18
- old
+ new
@@ -59,24 +59,40 @@
end
end
end
module InstanceMethods
- def key
- self.class.key_for(self.uuid)
- end
def save
Taskinator.redis do |conn|
conn.multi do
- RedisSerializationVisitor.new(conn, self).visit
- conn.sadd "taskinator:#{self.class.base_key}", self.uuid
+ visitor = RedisSerializationVisitor.new(conn, self).visit
+ conn.hmset(
+ "taskinator:#{self.key}",
+ :tasks_count, visitor.task_count,
+ :tasks_failed, 0,
+ :tasks_completed, 0,
+ :tasks_cancelled, 0,
+ )
true
end
end
end
+ # this is the persistence key
+ def key
+ @key ||= self.class.key_for(self.uuid)
+ end
+
+ # retrieves the root key associated
+ # with the process or task
+ def root_key
+ @root_key ||= Taskinator.redis do |conn|
+ conn.hget(self.key, :root_key)
+ end
+ end
+
# retrieves the workflow state
# this method is called from the workflow gem
def load_workflow_state
state = Taskinator.redis do |conn|
conn.hget(self.key, :state)
@@ -114,52 +130,96 @@
conn.hmget(self.key, :error_type, :error_message, :error_backtrace)
[error_type, error_message, JSON.parse(error_backtrace)]
end
end
+
+ def tasks_count
+ @tasks_count ||= begin
+ Taskinator.redis do |conn|
+ conn.hget "taskinator:#{self.root_key}", :tasks_count
+ end.to_i
+ end
+ end
+
+ %w(
+ failed
+ cancelled
+ completed
+ ).each do |status|
+
+ define_method "count_#{status}" do
+ Taskinator.redis do |conn|
+ conn.hget "taskinator:#{self.root_key}", status
+ end.to_i
+ end
+
+ define_method "incr_#{status}" do
+ Taskinator.redis do |conn|
+ conn.hincrby "taskinator:#{self.root_key}", status, 1
+ end
+ end
+
+ define_method "percentage_#{status}" do
+ tasks_count > 0 ? (send("count_#{status}") / tasks_count.to_f) * 100.0 : 0.0
+ end
+
+ end
+
end
class RedisSerializationVisitor < Visitor::Base
#
# the redis connection is passed in since it is
# in the multi statement mode in order to produce
# one roundtrip to the redis server
#
- def initialize(conn, instance, parent=nil)
- @conn = conn
- @instance = instance
- @key = instance.key
- # @parent = parent # not using this yet
+ attr_reader :instance
+
+ def initialize(conn, instance, base_visitor=self)
+ @conn = conn
+ @instance = instance
+ @key = instance.key
+ @root_key = base_visitor.instance.key
+ @base_visitor = base_visitor
+ @task_count = 0
end
# the starting point for serializing the instance
def visit
@hmset = []
@hmset << @key
+
@hmset += [:type, @instance.class.name]
@instance.accept(self)
+ # add the root key, for easy access later!
+ @hmset += [:root_key, @root_key]
+
# NB: splat args
@conn.hmset(*@hmset)
+
+ self
end
def visit_process(attribute)
process = @instance.send(attribute)
if process
@hmset += [attribute, process.uuid]
- RedisSerializationVisitor.new(@conn, process, @instance).visit
+ RedisSerializationVisitor.new(@conn, process, @base_visitor).visit
end
end
def visit_tasks(tasks)
- @hmset += [:task_count, tasks.count]
+ @hmset += [:task_count, tasks.count] # not used currently, but for informational purposes
tasks.each do |task|
- RedisSerializationVisitor.new(@conn, task, @instance).visit
+ RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
@conn.rpush "#{@key}:tasks", task.uuid
+ @base_visitor.incr_task_count unless task.is_a?(Task::SubProcess)
end
end
def visit_attribute(attribute)
value = @instance.send(attribute)
@@ -184,10 +244,18 @@
def visit_args(attribute)
values = @instance.send(attribute)
yaml = Taskinator::Persistence.serialize(values)
@hmset += [attribute, yaml]
end
+
+ def task_count
+ @task_count
+ end
+
+ def incr_task_count
+ @task_count += 1
+ end
end
class RedisDeserializationVisitor < Taskinator::Visitor::Base
#
@@ -291,12 +359,14 @@
# arbitrary instance to perform it's work
#
def lazy_instance_for(base, uuid)
Taskinator.redis do |conn|
type = conn.hget(base.key_for(uuid), :type)
+ root_key = conn.hget(base.key_for(uuid), :root_key)
+
klass = Kernel.const_get(type)
- LazyLoader.new(klass, uuid, @instance_cache)
+ LazyLoader.new(klass, uuid, root_key, @instance_cache)
end
end
end
# lazily loads the object specified by the type and uuid
@@ -308,17 +378,20 @@
# to keep loading the same objects again.
#
# E.g. this is useful for tasks which refer to their parent processes
#
- def initialize(type, uuid, instance_cache={})
+ def initialize(type, uuid, root_key, instance_cache={})
@type = type
@uuid = uuid
+ @root_key = root_key
@instance_cache = instance_cache
end
- attr_reader :uuid # shadows the real method, but will be the same!
+ # shadows the real methods, but will be the same!
+ attr_reader :uuid
+ attr_reader :root_key
# attempts to reload the actual process
def reload
@instance = nil
__getobj__
@@ -341,10 +414,12 @@
}
elsif values.is_a?(Hash)
values.each {|key, value|
values[key] = value.global_id if value.respond_to?(:global_id)
}
+ elsif values.respond_to?(:global_id)
+ values = values.global_id
end
YAML.dump(values)
end
def deserialize(yaml)
@@ -355,9 +430,11 @@
}
elsif values.is_a?(Hash)
values.each {|key, value|
values[key] = value.find if value.respond_to?(:model_id) && value.respond_to?(:find)
}
+ elsif values.respond_to?(:model_id) && values.respond_to?(:find)
+ values = values.find
end
values
end
end