lib/karafka/pro/routing/features/recurring_tasks/builder.rb in karafka-2.4.9 vs lib/karafka/pro/routing/features/recurring_tasks/builder.rb in karafka-2.4.10
- old
+ new
@@ -20,13 +20,11 @@
module Builder
# Enabled recurring tasks operations and adds needed topics and other stuff.
#
# @param active [Boolean] should recurring tasks be active. We use a boolean flag to
# have API consistency in the system, so it matches other routing related APIs.
- # @param block [Proc] optional reconfiguration of the tasks topic definitions.
- # @note Since we cannot provide two blocks, reconfiguration of logs topic can be only
- # done if user explicitly redefines it in the routing.
+ # @param block [Proc] optional reconfiguration of the topics definitions.
def recurring_tasks(active = false, &block)
return unless active
# We only require zlib when we decide to run recurring tasks because it is not needed
# otherwise.
@@ -37,11 +35,11 @@
topics_cfg = tasks_cfg.topics
consumer_group tasks_cfg.group_id do
# Registers the primary topic that we use to control schedules execution. This is
# the one that we use to trigger recurring tasks.
- topic(topics_cfg.schedules) do
+ schedules_topic = topic(topics_cfg.schedules) do
consumer tasks_cfg.consumer_class
deserializer tasks_cfg.deserializer
# Because the topic method name as well as builder proxy method name is the same
# we need to reference it via target directly
target.recurring_tasks(true)
@@ -83,18 +81,19 @@
interval: App.config.recurring_tasks.interval,
during_pause: false,
during_retry: false
)
- next unless block
-
- instance_eval(&block)
+ # If this is the direct schedules redefinition style, we run it
+ # The second one (see end of this method) allows for linear reconfiguration of
+ # both the topics
+ instance_eval(&block) if block && block.arity.zero?
end
# This topic is to store logs that we can then inspect either from the admin or via
# the Web UI
- topic(topics_cfg.logs) do
+ logs_topic = topic(topics_cfg.logs) do
active(false)
deserializer tasks_cfg.deserializer
target.recurring_tasks(true)
# Keep cron logs of executions for a week and after that remove. Week should be
@@ -102,9 +101,11 @@
config(
'cleanup.policy': 'delete',
'retention.ms': 604_800_000
)
end
+
+ yield(schedules_topic, logs_topic) if block && block.arity.positive?
end
end
# Checks if fugit is present. If not, will try to require it as it might not have
# been required but is available. If fails, will crash.