lib/crono_trigger/schedulable.rb in crono_trigger-0.4.0 vs lib/crono_trigger/schedulable.rb in crono_trigger-0.5.0
- old
+ new
@@ -2,10 +2,11 @@
require "active_support/core_ext/object"
require "chrono"
require "tzinfo"
require "crono_trigger/exception_handler"
+require "crono_trigger/execution_tracker"
module CronoTrigger
module Schedulable
DEFAULT_RETRY_LIMIT = 10
DEFAULT_RETRY_INTERVAL = 4
@@ -24,21 +25,24 @@
extend ActiveSupport::Concern
include ActiveSupport::Callbacks
included do
CronoTrigger::Schedulable.included_by << self
- class_attribute :crono_trigger_options, :executable_conditions
+ class_attribute :crono_trigger_options, :executable_conditions, :track_execution
self.crono_trigger_options ||= {}
self.executable_conditions ||= []
+ self.track_execution ||= false
+ has_many :crono_trigger_executions, class_name: "CronoTrigger::Models::Execution", as: :schedule, inverse_of: :schedule
+
define_model_callbacks :execute, :retry
scope :executables, ->(from: Time.current, limit: CronoTrigger.config.executor_thread * 3 || 100, including_locked: false) do
t = arel_table
rel = where(t[crono_trigger_column_name(:next_execute_at)].lteq(from))
- rel = rel.where(t[crono_trigger_column_name(:execute_lock)].lteq(from.to_i - execute_lock_timeout)) unless including_locked
+ rel = rel.where(t[crono_trigger_column_name(:execute_lock)].lt(from.to_i - execute_lock_timeout)) unless including_locked
rel = rel.where(t[crono_trigger_column_name(:started_at)].lteq(from)) if column_names.include?(crono_trigger_column_name(:started_at))
rel = rel.where(t[crono_trigger_column_name(:finished_at)].gt(from).or(t[crono_trigger_column_name(:finished_at)].eq(nil))) if column_names.include?(crono_trigger_column_name(:finished_at))
rel = rel.order(crono_trigger_column_name(:next_execute_at) => :asc).limit(limit)
@@ -59,21 +63,22 @@
validate :validate_cron_format
end
module ClassMethods
def executables_with_lock(limit: CronoTrigger.config.executor_thread * 3 || 100)
- records = nil
- transaction do
- records = executables(limit: limit).lock.to_a
- unless records.empty?
- where(id: records).update_all(
- crono_trigger_column_name(:execute_lock) => Time.current.to_i,
- crono_trigger_column_name(:locked_by) => CronoTrigger.config.worker_id
- )
+ ids = executables(limit: limit).pluck(:id)
+ records = []
+ ids.each do |id|
+ transaction do
+ r = all.lock.find(id)
+ unless r.locking?
+ r.crono_trigger_lock!
+ records << r
+ end
end
- records
end
+ records
end
def crono_trigger_column_name(name)
crono_trigger_options["#{name}_column_name".to_sym].try(:to_s) || name.to_s
end
@@ -104,16 +109,19 @@
self.executable_conditions.clear
end
end
def do_execute
+ execution_tracker = ExecutionTracker.new(self)
run_callbacks :execute do
catch(:ok_without_reset) do
catch(:ok) do
catch(:retry) do
catch(:abort) do
- execute
+ execution_tracker.track do
+ execute
+ end
throw :ok
end
raise AbortExecution
end
retry!
@@ -149,30 +157,38 @@
end
if new_record?
self.attributes = attributes
else
+ merge_updated_at_for_crono_trigger!(attributes)
update_columns(attributes)
end
+
+ self
end
- def retry!
+ def retry!(immediately: false)
run_callbacks :retry do
logger.info "Retry #{self.class}-#{id}" if logger
now = Time.current
- wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval
+ if immediately
+ wait = 0
+ else
+ wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval
+ end
attributes = {
crono_trigger_column_name(:next_execute_at) => now + wait,
crono_trigger_column_name(:execute_lock) => 0,
crono_trigger_column_name(:locked_by) => nil,
}
if self.class.column_names.include?("retry_count")
attributes.merge!(retry_count: retry_count.to_i + 1)
end
+ merge_updated_at_for_crono_trigger!(attributes, now)
update_columns(attributes)
end
end
def reset!(update_last_executed_at = true)
@@ -182,26 +198,40 @@
crono_trigger_column_name(:next_execute_at) => calculate_next_execute_at,
crono_trigger_column_name(:execute_lock) => 0,
crono_trigger_column_name(:locked_by) => nil,
}
+ now = Time.current
+
if update_last_executed_at && self.class.column_names.include?(crono_trigger_column_name(:last_executed_at))
- attributes.merge!(crono_trigger_column_name(:last_executed_at) => Time.current)
+ attributes.merge!(crono_trigger_column_name(:last_executed_at) => now)
end
if self.class.column_names.include?("retry_count")
attributes.merge!(retry_count: 0)
end
+ merge_updated_at_for_crono_trigger!(attributes, now)
update_columns(attributes)
end
+ def crono_trigger_lock!
+ attributes = {
+ crono_trigger_column_name(:execute_lock) => Time.current.to_i,
+ crono_trigger_column_name(:locked_by) => CronoTrigger.config.worker_id
+ }
+ merge_updated_at_for_crono_trigger!(attributes)
+ update_columns(attributes)
+ end
+
def crono_trigger_unlock!
- update_columns(
+ attributes = {
crono_trigger_column_name(:execute_lock) => 0,
crono_trigger_column_name(:locked_by) => nil,
- )
+ }
+ merge_updated_at_for_crono_trigger!(attributes)
+ update_columns(attributes)
end
def crono_trigger_status
case
when locking?
@@ -221,11 +251,11 @@
self[crono_trigger_column_name(:next_execute_at)].nil? && last_executed_at.nil?
end
def locking?(at: Time.now)
self[crono_trigger_column_name(:execute_lock)] > 0 &&
- self[crono_trigger_column_name(:execute_lock)] > at.to_i - self.class.execute_lock_timeout
+ self[crono_trigger_column_name(:execute_lock)] >= at.to_f - self.class.execute_lock_timeout
end
def assume_executing?
locking?
end
@@ -296,9 +326,16 @@
if columns.include?("last_error_time")
attributes.merge!(last_error_time: now)
end
+ merge_updated_at_for_crono_trigger!(attributes)
update_columns(attributes) unless attributes.empty?
+ end
+
+ def merge_updated_at_for_crono_trigger!(attributes, time = Time.current)
+ if self.class.column_names.include?("updated_at")
+ attributes.merge!("updated_at" => time)
+ end
end
end
end