lib/crono_trigger/schedulable.rb in crono_trigger-0.3.2 vs lib/crono_trigger/schedulable.rb in crono_trigger-0.3.4
- old
+ new
@@ -9,33 +9,41 @@
module Schedulable
DEFAULT_RETRY_LIMIT = 10
DEFAULT_RETRY_INTERVAL = 4
DEFAULT_EXECUTE_LOCK_TIMEOUT = 600
+ class NoRestrictedUnlockError < StandardError; end
+
+ @included_by = []
+
+ def self.included_by
+ @included_by
+ end
+
class AbortExecution < StandardError; end
extend ActiveSupport::Concern
include ActiveSupport::Callbacks
included do
+ CronoTrigger::Schedulable.included_by << self
class_attribute :crono_trigger_options, :executable_conditions
self.crono_trigger_options ||= {}
self.executable_conditions ||= []
define_model_callbacks :execute
- scope :executables, ->(from: Time.current, primary_key_offset: nil, limit: 1000) do
+ scope :executables, ->(from: Time.current, limit: CronoTrigger.config.executor_thread * 3 || 100, including_locked: false) do
t = arel_table
rel = where(t[crono_trigger_column_name(:next_execute_at)].lteq(from))
- .where(t[crono_trigger_column_name(:execute_lock)].lteq(from.to_i - execute_lock_timeout))
+ rel = rel.where(t[crono_trigger_column_name(:execute_lock)].lteq(from.to_i - execute_lock_timeout)) unless including_locked
rel = rel.where(t[crono_trigger_column_name(:started_at)].lteq(from)) if column_names.include?(crono_trigger_column_name(:started_at))
rel = rel.where(t[crono_trigger_column_name(:finished_at)].gt(from).or(t[crono_trigger_column_name(:finished_at)].eq(nil))) if column_names.include?(crono_trigger_column_name(:finished_at))
- rel = rel.where(t[primary_key].gt(primary_key_offset)) if primary_key_offset
- rel = rel.order(Arel.sql("#{quoted_table_name}.#{quoted_primary_key} ASC")).limit(limit)
+ rel = rel.order(crono_trigger_column_name(:next_execute_at) => :asc).limit(limit)
rel = executable_conditions.reduce(rel) do |merged, pr|
if pr.arity == 0
merged.merge(instance_exec(&pr))
else
@@ -50,16 +58,19 @@
validate :validate_cron_format
end
module ClassMethods
- def executables_with_lock(primary_key_offset: nil, limit: 1000)
+ def executables_with_lock(limit: CronoTrigger.config.executor_thread * 3 || 100)
records = nil
transaction do
- records = executables(primary_key_offset: primary_key_offset, limit: limit).lock.to_a
+ records = executables(limit: limit).lock.to_a
unless records.empty?
- where(id: records).update_all(crono_trigger_column_name(:execute_lock) => Time.current.to_i)
+ where(id: records).update_all(
+ crono_trigger_column_name(:execute_lock) => Time.current.to_i,
+ crono_trigger_column_name(:locked_by) => CronoTrigger.config.worker_id
+ )
end
records
end
end
@@ -69,10 +80,22 @@
def execute_lock_timeout
(crono_trigger_options[:execute_lock_timeout] || DEFAULT_EXECUTE_LOCK_TIMEOUT)
end
+ def crono_trigger_unlock_all!
+ wheres = all.where_values_hash
+ if wheres.empty?
+ raise NoRestrictedUnlockError, "Need `where` filter at least one, because this method is danger"
+ else
+ update_all(
+ crono_trigger_column_name(:execute_lock) => 0,
+ crono_trigger_column_name(:locked_by) => nil,
+ )
+ end
+ end
+
private
def add_executable_conditions(pr)
self.executable_conditions << pr
end
@@ -135,11 +158,15 @@
def retry!
logger.info "Retry #{self.class}-#{id}" if logger
now = Time.current
wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval
- attributes = {crono_trigger_column_name(:next_execute_at) => now + wait, crono_trigger_column_name(:execute_lock) => 0}
+ attributes = {
+ crono_trigger_column_name(:next_execute_at) => now + wait,
+ crono_trigger_column_name(:execute_lock) => 0,
+ crono_trigger_column_name(:locked_by) => nil,
+ }
if self.class.column_names.include?("retry_count")
attributes.merge!(retry_count: retry_count.to_i + 1)
end
@@ -147,11 +174,15 @@
end
def reset!(update_last_executed_at = true)
logger.info "Reset execution schedule #{self.class}-#{id}" if logger
- attributes = {crono_trigger_column_name(:next_execute_at) => calculate_next_execute_at, crono_trigger_column_name(:execute_lock) => 0}
+ attributes = {
+ crono_trigger_column_name(:next_execute_at) => calculate_next_execute_at,
+ crono_trigger_column_name(:execute_lock) => 0,
+ crono_trigger_column_name(:locked_by) => nil,
+ }
if update_last_executed_at && self.class.column_names.include?(crono_trigger_column_name(:last_executed_at))
attributes.merge!(crono_trigger_column_name(:last_executed_at) => Time.current)
end
@@ -160,21 +191,42 @@
end
update_columns(attributes)
end
- def assume_executing?
- execute_lock_timeout = self.class.execute_lock_timeout
- locking? &&
- self[crono_trigger_column_name(:execute_lock)] + execute_lock_timeout >= Time.now.to_i
+ def crono_trigger_unlock!
+ update_columns(
+ crono_trigger_column_name(:execute_lock) => 0,
+ crono_trigger_column_name(:locked_by) => nil,
+ )
end
- def locking?
- self[crono_trigger_column_name(:execute_lock)] > 0
+ def crono_trigger_status
+ case
+ when locking?
+ :locked
+ when waiting?
+ :waiting
+ when not_scheduled?
+ :not_scheduled
+ end
end
- def idling?
- !locking?
+ def waiting?
+ !!self[crono_trigger_column_name(:next_execute_at)]
+ end
+
+ def not_scheduled?
+ self[crono_trigger_column_name(:next_execute_at)].nil? && last_executed_at.nil?
+ end
+
+ def locking?(at: Time.now)
+ self[crono_trigger_column_name(:execute_lock)] > 0 &&
+ self[crono_trigger_column_name(:execute_lock)] > at.to_i - self.class.execute_lock_timeout
+ end
+
+ def assume_executing?
+ locking?
end
def crono_trigger_column_name(name)
self.class.crono_trigger_column_name(name)
end