lib/crono_trigger/schedulable.rb in crono_trigger-0.4.0 vs lib/crono_trigger/schedulable.rb in crono_trigger-0.5.0

- old
+ new

@@ -2,10 +2,11 @@ require "active_support/core_ext/object" require "chrono" require "tzinfo" require "crono_trigger/exception_handler" +require "crono_trigger/execution_tracker" module CronoTrigger module Schedulable DEFAULT_RETRY_LIMIT = 10 DEFAULT_RETRY_INTERVAL = 4 @@ -24,21 +25,24 @@ extend ActiveSupport::Concern include ActiveSupport::Callbacks included do CronoTrigger::Schedulable.included_by << self - class_attribute :crono_trigger_options, :executable_conditions + class_attribute :crono_trigger_options, :executable_conditions, :track_execution self.crono_trigger_options ||= {} self.executable_conditions ||= [] + self.track_execution ||= false + has_many :crono_trigger_executions, class_name: "CronoTrigger::Models::Execution", as: :schedule, inverse_of: :schedule + define_model_callbacks :execute, :retry scope :executables, ->(from: Time.current, limit: CronoTrigger.config.executor_thread * 3 || 100, including_locked: false) do t = arel_table rel = where(t[crono_trigger_column_name(:next_execute_at)].lteq(from)) - rel = rel.where(t[crono_trigger_column_name(:execute_lock)].lteq(from.to_i - execute_lock_timeout)) unless including_locked + rel = rel.where(t[crono_trigger_column_name(:execute_lock)].lt(from.to_i - execute_lock_timeout)) unless including_locked rel = rel.where(t[crono_trigger_column_name(:started_at)].lteq(from)) if column_names.include?(crono_trigger_column_name(:started_at)) rel = rel.where(t[crono_trigger_column_name(:finished_at)].gt(from).or(t[crono_trigger_column_name(:finished_at)].eq(nil))) if column_names.include?(crono_trigger_column_name(:finished_at)) rel = rel.order(crono_trigger_column_name(:next_execute_at) => :asc).limit(limit) @@ -59,21 +63,22 @@ validate :validate_cron_format end module ClassMethods def executables_with_lock(limit: CronoTrigger.config.executor_thread * 3 || 100) - records = nil - transaction do - records = executables(limit: limit).lock.to_a - unless records.empty? - where(id: records).update_all( - crono_trigger_column_name(:execute_lock) => Time.current.to_i, - crono_trigger_column_name(:locked_by) => CronoTrigger.config.worker_id - ) + ids = executables(limit: limit).pluck(:id) + records = [] + ids.each do |id| + transaction do + r = all.lock.find(id) + unless r.locking? + r.crono_trigger_lock! + records << r + end end - records end + records end def crono_trigger_column_name(name) crono_trigger_options["#{name}_column_name".to_sym].try(:to_s) || name.to_s end @@ -104,16 +109,19 @@ self.executable_conditions.clear end end def do_execute + execution_tracker = ExecutionTracker.new(self) run_callbacks :execute do catch(:ok_without_reset) do catch(:ok) do catch(:retry) do catch(:abort) do - execute + execution_tracker.track do + execute + end throw :ok end raise AbortExecution end retry! @@ -149,30 +157,38 @@ end if new_record? self.attributes = attributes else + merge_updated_at_for_crono_trigger!(attributes) update_columns(attributes) end + + self end - def retry! + def retry!(immediately: false) run_callbacks :retry do logger.info "Retry #{self.class}-#{id}" if logger now = Time.current - wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval + if immediately + wait = 0 + else + wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval + end attributes = { crono_trigger_column_name(:next_execute_at) => now + wait, crono_trigger_column_name(:execute_lock) => 0, crono_trigger_column_name(:locked_by) => nil, } if self.class.column_names.include?("retry_count") attributes.merge!(retry_count: retry_count.to_i + 1) end + merge_updated_at_for_crono_trigger!(attributes, now) update_columns(attributes) end end def reset!(update_last_executed_at = true) @@ -182,26 +198,40 @@ crono_trigger_column_name(:next_execute_at) => calculate_next_execute_at, crono_trigger_column_name(:execute_lock) => 0, crono_trigger_column_name(:locked_by) => nil, } + now = Time.current + if update_last_executed_at && self.class.column_names.include?(crono_trigger_column_name(:last_executed_at)) - attributes.merge!(crono_trigger_column_name(:last_executed_at) => Time.current) + attributes.merge!(crono_trigger_column_name(:last_executed_at) => now) end if self.class.column_names.include?("retry_count") attributes.merge!(retry_count: 0) end + merge_updated_at_for_crono_trigger!(attributes, now) update_columns(attributes) end + def crono_trigger_lock! + attributes = { + crono_trigger_column_name(:execute_lock) => Time.current.to_i, + crono_trigger_column_name(:locked_by) => CronoTrigger.config.worker_id + } + merge_updated_at_for_crono_trigger!(attributes) + update_columns(attributes) + end + def crono_trigger_unlock! - update_columns( + attributes = { crono_trigger_column_name(:execute_lock) => 0, crono_trigger_column_name(:locked_by) => nil, - ) + } + merge_updated_at_for_crono_trigger!(attributes) + update_columns(attributes) end def crono_trigger_status case when locking? @@ -221,11 +251,11 @@ self[crono_trigger_column_name(:next_execute_at)].nil? && last_executed_at.nil? end def locking?(at: Time.now) self[crono_trigger_column_name(:execute_lock)] > 0 && - self[crono_trigger_column_name(:execute_lock)] > at.to_i - self.class.execute_lock_timeout + self[crono_trigger_column_name(:execute_lock)] >= at.to_f - self.class.execute_lock_timeout end def assume_executing? locking? end @@ -296,9 +326,16 @@ if columns.include?("last_error_time") attributes.merge!(last_error_time: now) end + merge_updated_at_for_crono_trigger!(attributes) update_columns(attributes) unless attributes.empty? + end + + def merge_updated_at_for_crono_trigger!(attributes, time = Time.current) + if self.class.column_names.include?("updated_at") + attributes.merge!("updated_at" => time) + end end end end