lib/crono_trigger/schedulable.rb in crono_trigger-0.3.2 vs lib/crono_trigger/schedulable.rb in crono_trigger-0.3.4

- old
+ new

@@ -9,33 +9,41 @@ module Schedulable DEFAULT_RETRY_LIMIT = 10 DEFAULT_RETRY_INTERVAL = 4 DEFAULT_EXECUTE_LOCK_TIMEOUT = 600 + class NoRestrictedUnlockError < StandardError; end + + @included_by = [] + + def self.included_by + @included_by + end + class AbortExecution < StandardError; end extend ActiveSupport::Concern include ActiveSupport::Callbacks included do + CronoTrigger::Schedulable.included_by << self class_attribute :crono_trigger_options, :executable_conditions self.crono_trigger_options ||= {} self.executable_conditions ||= [] define_model_callbacks :execute - scope :executables, ->(from: Time.current, primary_key_offset: nil, limit: 1000) do + scope :executables, ->(from: Time.current, limit: CronoTrigger.config.executor_thread * 3 || 100, including_locked: false) do t = arel_table rel = where(t[crono_trigger_column_name(:next_execute_at)].lteq(from)) - .where(t[crono_trigger_column_name(:execute_lock)].lteq(from.to_i - execute_lock_timeout)) + rel = rel.where(t[crono_trigger_column_name(:execute_lock)].lteq(from.to_i - execute_lock_timeout)) unless including_locked rel = rel.where(t[crono_trigger_column_name(:started_at)].lteq(from)) if column_names.include?(crono_trigger_column_name(:started_at)) rel = rel.where(t[crono_trigger_column_name(:finished_at)].gt(from).or(t[crono_trigger_column_name(:finished_at)].eq(nil))) if column_names.include?(crono_trigger_column_name(:finished_at)) - rel = rel.where(t[primary_key].gt(primary_key_offset)) if primary_key_offset - rel = rel.order(Arel.sql("#{quoted_table_name}.#{quoted_primary_key} ASC")).limit(limit) + rel = rel.order(crono_trigger_column_name(:next_execute_at) => :asc).limit(limit) rel = executable_conditions.reduce(rel) do |merged, pr| if pr.arity == 0 merged.merge(instance_exec(&pr)) else @@ -50,16 +58,19 @@ validate :validate_cron_format end module ClassMethods - def executables_with_lock(primary_key_offset: nil, limit: 1000) + def executables_with_lock(limit: CronoTrigger.config.executor_thread * 3 || 100) records = nil transaction do - records = executables(primary_key_offset: primary_key_offset, limit: limit).lock.to_a + records = executables(limit: limit).lock.to_a unless records.empty? - where(id: records).update_all(crono_trigger_column_name(:execute_lock) => Time.current.to_i) + where(id: records).update_all( + crono_trigger_column_name(:execute_lock) => Time.current.to_i, + crono_trigger_column_name(:locked_by) => CronoTrigger.config.worker_id + ) end records end end @@ -69,10 +80,22 @@ def execute_lock_timeout (crono_trigger_options[:execute_lock_timeout] || DEFAULT_EXECUTE_LOCK_TIMEOUT) end + def crono_trigger_unlock_all! + wheres = all.where_values_hash + if wheres.empty? + raise NoRestrictedUnlockError, "Need `where` filter at least one, because this method is danger" + else + update_all( + crono_trigger_column_name(:execute_lock) => 0, + crono_trigger_column_name(:locked_by) => nil, + ) + end + end + private def add_executable_conditions(pr) self.executable_conditions << pr end @@ -135,11 +158,15 @@ def retry! logger.info "Retry #{self.class}-#{id}" if logger now = Time.current wait = crono_trigger_options[:exponential_backoff] ? retry_interval * [2 * (retry_count - 1), 1].max : retry_interval - attributes = {crono_trigger_column_name(:next_execute_at) => now + wait, crono_trigger_column_name(:execute_lock) => 0} + attributes = { + crono_trigger_column_name(:next_execute_at) => now + wait, + crono_trigger_column_name(:execute_lock) => 0, + crono_trigger_column_name(:locked_by) => nil, + } if self.class.column_names.include?("retry_count") attributes.merge!(retry_count: retry_count.to_i + 1) end @@ -147,11 +174,15 @@ end def reset!(update_last_executed_at = true) logger.info "Reset execution schedule #{self.class}-#{id}" if logger - attributes = {crono_trigger_column_name(:next_execute_at) => calculate_next_execute_at, crono_trigger_column_name(:execute_lock) => 0} + attributes = { + crono_trigger_column_name(:next_execute_at) => calculate_next_execute_at, + crono_trigger_column_name(:execute_lock) => 0, + crono_trigger_column_name(:locked_by) => nil, + } if update_last_executed_at && self.class.column_names.include?(crono_trigger_column_name(:last_executed_at)) attributes.merge!(crono_trigger_column_name(:last_executed_at) => Time.current) end @@ -160,21 +191,42 @@ end update_columns(attributes) end - def assume_executing? - execute_lock_timeout = self.class.execute_lock_timeout - locking? && - self[crono_trigger_column_name(:execute_lock)] + execute_lock_timeout >= Time.now.to_i + def crono_trigger_unlock! + update_columns( + crono_trigger_column_name(:execute_lock) => 0, + crono_trigger_column_name(:locked_by) => nil, + ) end - def locking? - self[crono_trigger_column_name(:execute_lock)] > 0 + def crono_trigger_status + case + when locking? + :locked + when waiting? + :waiting + when not_scheduled? + :not_scheduled + end end - def idling? - !locking? + def waiting? + !!self[crono_trigger_column_name(:next_execute_at)] + end + + def not_scheduled? + self[crono_trigger_column_name(:next_execute_at)].nil? && last_executed_at.nil? + end + + def locking?(at: Time.now) + self[crono_trigger_column_name(:execute_lock)] > 0 && + self[crono_trigger_column_name(:execute_lock)] > at.to_i - self.class.execute_lock_timeout + end + + def assume_executing? + locking? end def crono_trigger_column_name(name) self.class.crono_trigger_column_name(name) end