lib/karafka/pro/routing/features/recurring_tasks/builder.rb in karafka-2.4.9 vs lib/karafka/pro/routing/features/recurring_tasks/builder.rb in karafka-2.4.10

- old
+ new

@@ -20,13 +20,11 @@ module Builder # Enabled recurring tasks operations and adds needed topics and other stuff. # # @param active [Boolean] should recurring tasks be active. We use a boolean flag to # have API consistency in the system, so it matches other routing related APIs. - # @param block [Proc] optional reconfiguration of the tasks topic definitions. - # @note Since we cannot provide two blocks, reconfiguration of logs topic can be only - # done if user explicitly redefines it in the routing. + # @param block [Proc] optional reconfiguration of the topics definitions. def recurring_tasks(active = false, &block) return unless active # We only require zlib when we decide to run recurring tasks because it is not needed # otherwise. @@ -37,11 +35,11 @@ topics_cfg = tasks_cfg.topics consumer_group tasks_cfg.group_id do # Registers the primary topic that we use to control schedules execution. This is # the one that we use to trigger recurring tasks. - topic(topics_cfg.schedules) do + schedules_topic = topic(topics_cfg.schedules) do consumer tasks_cfg.consumer_class deserializer tasks_cfg.deserializer # Because the topic method name as well as builder proxy method name is the same # we need to reference it via target directly target.recurring_tasks(true) @@ -83,18 +81,19 @@ interval: App.config.recurring_tasks.interval, during_pause: false, during_retry: false ) - next unless block - - instance_eval(&block) + # If this is the direct schedules redefinition style, we run it + # The second one (see end of this method) allows for linear reconfiguration of + # both the topics + instance_eval(&block) if block && block.arity.zero? end # This topic is to store logs that we can then inspect either from the admin or via # the Web UI - topic(topics_cfg.logs) do + logs_topic = topic(topics_cfg.logs) do active(false) deserializer tasks_cfg.deserializer target.recurring_tasks(true) # Keep cron logs of executions for a week and after that remove. Week should be @@ -102,9 +101,11 @@ config( 'cleanup.policy': 'delete', 'retention.ms': 604_800_000 ) end + + yield(schedules_topic, logs_topic) if block && block.arity.positive? end end # Checks if fugit is present. If not, will try to require it as it might not have # been required but is available. If fails, will crash.