Sha256: c0ed27b3742d82634d04288eac806293598821c125a1d9533b7ffdb686e4dd03
Contents?: true
Size: 1.18 KB
Versions: 4
Compression:
Stored size: 1.18 KB
Contents
module NulogyMessageBusConsumer module Steps # A generic class to run a "task" on a timer (in a separate thread!) # This class runs the code, the Task does the work # # A Task must implement the methods called in this class: # - #extract_args(kwargs) # Called with the keyword arguments (kwargs) that is passed to this step. # This is a chance to pull out references to pipeline variables (e.g. kafka_consumer) # - #call # The work the Task should perform # - #interval # The time, in seconds, between invocations of #call class TimedTask def initialize(task) @task = task end def call(**kwargs) @task.extract_args(**kwargs) # Ensure that the process is terminated if there is a problem getting the consumption lag. # This also ensures that the process will terminate on-boot if it cannot connect to Kafka, # allowing the container to be terminated by ECS. Thread.abort_on_exception = true Thread.new { run } yield end private def run loop do @task.call sleep @task.interval end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems