module Rworkflow class Flow STATE_SUCCESSFUL = :successful STATE_FAILED = :failed STATES_TERMINAL = [STATE_FAILED, STATE_SUCCESSFUL].freeze STATES_FAILED = [STATE_FAILED].freeze REDIS_NS = 'flow'.freeze WORKFLOW_REGISTRY = "#{REDIS_NS}:__registry".freeze attr_accessor :id attr_reader :lifecycle def initialize(id) @id = id @redis_key = "#{REDIS_NS}:#{id}" @storage = RedisRds::Hash.new(@redis_key) @flow_data = RedisRds::Hash.new("#{@redis_key}__data") @processing = RedisRds::Hash.new("#{@redis_key}__processing") load_lifecycle end def load_lifecycle serialized = @storage.get(:lifecycle) unless serialized.nil? raw = self.class.serializer.load(serialized) @lifecycle = Rworkflow::Lifecycle.unserialize(raw) unless raw.nil? end rescue @lifecycle = nil end private :load_lifecycle def lifecycle=(new_lifecycle) @lifecycle = new_lifecycle @storage.set(:lifecycle, self.class.serializer.dump(@lifecycle.serialize)) end def finished? return false unless started? total = self.counters.reduce(0) do |sum, pair| self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i) end return total == 0 end def status status = 'Running' status = successful? ? 'Finished' : 'Failed' if finished? return status end def created_at return @created_at ||= begin Time.zone.at(get(:created_at, 0)) end end def started? return !get(:start_time).nil? end def name return get(:name, @id) end def name=(name) return set(:name, name) end def start_time return Time.zone.at(get(:start_time, 0)) end def finish_time return Time.zone.at(get(:finish_time, 0)) end def expected_duration return Float::INFINITY end def valid? return !@lifecycle.nil? end def count(state) return get_list(state).size end def counters the_counters = @storage.get(:counters) if !the_counters.nil? the_counters = begin self.class.serializer.load(the_counters) rescue => e Rails.logger.error("Error loading stored flow counters: #{e.message}") nil end end return the_counters || counters! end # fetches counters atomically def counters! the_counters = { processing: 0 } names = @lifecycle.states.keys results = RedisRds::Object.connection.multi do self.class::STATES_TERMINAL.each { |name| get_list(name).size } names.each { |name| get_list(name).size } @processing.getall end (self.class::STATES_TERMINAL + names).each do |name| the_counters[name] = results.shift.to_i end the_counters[:processing] = results.shift.reduce(0) { |sum, pair| sum + pair.last.to_i } return the_counters end private :counters! def fetch(fetcher_id, state_name) @processing.set(fetcher_id, 1) list = get_state_list(state_name) unless list.nil? failed = [] cardinality = @lifecycle.states[state_name].cardinality cardinality = get(:start_count).to_i if cardinality == Lifecycle::CARDINALITY_ALL_STARTED force_list_complete = @lifecycle.states[state_name].policy == State::STATE_POLICY_WAIT raw_objects = list.lpop(cardinality, force_list_complete) unless raw_objects.empty? objects = raw_objects.map do |raw_object| begin self.class.serializer.load(raw_object) rescue StandardError => _ failed << raw_object nil end end.compact @processing.set(fetcher_id, objects.size) unless failed.empty? push(failed, STATE_FAILED) Rails.logger.error("Failed to parse #{failed.size} in workflow #{@id} for fetcher id #{fetcher_id} at state #{state_name}") end yield(objects) if block_given? end end ensure @processing.remove(fetcher_id) terminate if finished? end def list_objects(state_name, limit = -1) list = get_list(state_name) return list.get(0, limit).map { |object| self.class.serializer.load(object) } end def get_state_list(state_name) list = nil state = @lifecycle.states[state_name] if !state.nil? list = get_list(state_name) else Rails.logger.error("Tried accessing invalid state #{state_name} for workflow #{id}") end return list end private :get_state_list def terminate mutex = RedisRds::Mutex.new(self.id) mutex.synchronize do if !self.cleaned_up? set(:finish_time, Time.now.to_i) post_process if self.public? the_counters = self.counters! the_counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done @storage.setnx(:counters, self.class.serializer.dump(the_counters)) states_cleanup else self.cleanup end end end end def post_process; end protected :post_process def metadata_string return "Rworkflow: #{self.name}" end def cleaned_up? return states_list.all? { |name| !get_list(name).exists? } end def states_list states = self.class::STATES_TERMINAL states += @lifecycle.states.keys if valid? return states end def transition(from_state, name, objects) objects = Array.wrap(objects) to_state = begin lifecycle.transition(from_state, name) rescue Rworkflow::StateError => e Rails.logger.error("Error transitioning: #{e}") nil end if !to_state.nil? push(objects, to_state) log(from_state, name, objects.size) end end def logging? return get(:logging, false) end def log(from_state, transition, num_objects) logger.incrby("#{from_state}__#{transition}", num_objects.to_i) if logging? end def logger return @logger ||= begin RedisRds::Hash.new("#{@redis_key}__logger") end end def logs logs = {} if valid? && logging? state_transition_counters = logger.getall state_transition_counters.each do |state_transition, counter| state, transition = state_transition.split('__') logs[state] = {} unless logs.key?(state) logs[state][transition] = counter.to_i end end return logs end def get_state_cardinality(state_name) cardinality = @lifecycle.states[state_name].cardinality cardinality = self.get(:start_count).to_i if cardinality == Rworkflow::Lifecycle::CARDINALITY_ALL_STARTED return cardinality end def set(key, value) @flow_data.set(key, self.class.serializer.dump(value)) end def get(key, default = nil) value = @flow_data.get(key) value = value.nil? ? default : self.class.serializer.load(value) return value end def incr(key, value = 1) return @flow_data.incrby(key, value) end def push(objects, state) objects = Array.wrap(objects) return 0 if objects.empty? list = get_list(state) list.rpush(objects.map { |object| self.class.serializer.dump(object) }) return objects.size end private :push def get_list(name) return RedisRds::List.new("#{@redis_key}:lists:#{name}") end private :get_list def cleanup return if Rails.env.test? states_cleanup logger.delete @processing.delete self.class.unregister(self) @flow_data.delete @storage.delete end def states_cleanup return if Rails.env.test? states_list.each { |name| get_list(name).delete } end protected :states_cleanup def start(objects) objects = Array.wrap(objects) set(:start_time, Time.now.to_i) set(:start_count, objects.size) push(objects, lifecycle.initial) log(lifecycle.initial, 'initial', objects.size) end def total_objects_processed(counters = nil) return (counters || self.counters).reduce(0) do |sum, pair| if self.class.terminal?(pair[0]) sum + pair[1] else sum end end end def total_objects(counters = nil) return (counters || self.counters).reduce(0) { |sum, pair| sum + pair[1] } end def total_objects_failed(counters = nil) return (counters || self.counters).reduce(0) do |sum, pair| if self.class.failure?(pair[0]) sum + pair[1] else sum end end end def successful? return false if !finished? return !failed? end def failed? return false if !finished? return total_objects_failed > 0 end def public? return @public ||= begin get(:public, false) end end class << self def create(lifecycle, name = '', options = {}) id = generate_id(name) workflow = new(id) workflow.name = name workflow.lifecycle = lifecycle workflow.set(:created_at, Time.now.to_i) workflow.set(:public, options.fetch(:public, false)) workflow.set(:logging, options.fetch(:logging, true)) register(workflow) return workflow end def generate_id(workflow_name) now = Time.now.to_f random = Random.new(now) return "#{name}__#{workflow_name}__#{(Time.now.to_f * 1000).to_i}__#{random.rand(now).to_i}" end private :generate_id def cleanup(id) workflow = new(id) workflow.cleanup end def get_public_workflows(options = {}) return registry.public_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) } end def get_private_workflows(options = {}) return registry.private_flows(options.reverse_merge(parent_class: self)).map { |id| load(id) } end def all(options = {}) return registry.all(options.reverse_merge(parent_class: self)).map { |id| load(id) } end def load(id, klass = nil) workflow = nil klass = read_flow_class(id) if klass.nil? workflow = klass.new(id) if klass.respond_to?(:new) return workflow end def read_flow_class(id) klass = nil raw_class = id.split('__').first if !raw_class.nil? klass = begin raw_class.constantize rescue NameError => _ Rails.logger.warn("Unknown flow class for workflow id #{id}") nil end end return klass end def registered?(workflow) return registry.include?(workflow) end def register(workflow) registry.add(workflow) end def unregister(workflow) registry.remove(workflow) end def terminal?(state) return self::STATES_TERMINAL.include?(state) end def failure?(state) return self::STATES_FAILED.include?(state) end def registry return @registry ||= begin FlowRegistry.new(Rworkflow::VERSION.to_s) end end def serializer YAML end end end end