lib/karafka/processing/jobs/base.rb in karafka-2.2.13 vs lib/karafka/processing/jobs/base.rb in karafka-2.2.14
- old
+ new
@@ -18,15 +18,18 @@
# Creates a new job instance
def initialize
# All jobs are blocking by default and they can release the lock when blocking operations
# are done (if needed)
@non_blocking = false
+ @status = :pending
end
- # When redefined can run any code prior to the job being enqueued
+ # When redefined can run any code prior to the job being scheduled
# @note This will run in the listener thread and not in the worker
- def before_enqueue; end
+ def before_schedule
+ raise NotImplementedError, 'Please implement in a subclass'
+ end
# When redefined can run any code that should run before executing the proper code
def before_call; end
# The main entry-point of a job
@@ -46,9 +49,23 @@
#
# @note Job **needs** to mark itself as non-blocking only **after** it is done with all
# the blocking things (pausing partition, etc).
def non_blocking?
@non_blocking
+ end
+
+ # @return [Boolean] was this job finished.
+ def finished?
+ @status == :finished
+ end
+
+ # Marks the job as finished. Used by the worker to indicate, that this job is done.
+ #
+ # @note Since the scheduler knows exactly when it schedules jobs and when it keeps them
+ # pending, we do not need advanced state tracking and the only information from the
+ # "outside" is whether it was finished or not after it was scheduled for execution.
+ def finish!
+ @status = :finished
end
end
end
end
end