lib/karafka/scheduler.rb in karafka-2.0.0.beta1 vs lib/karafka/scheduler.rb in karafka-2.0.0.beta2
- old
+ new
@@ -1,21 +1,20 @@
# frozen_string_literal: true
module Karafka
# FIFO scheduler for messages coming from various topics and partitions
class Scheduler
- # Yields messages from partitions in the fifo order
+ # Schedules jobs in the fifo order
#
- # @param messages_buffer [Karafka::Connection::MessagesBuffer] messages buffer with data from
- # multiple topics and partitions
- # @yieldparam [String] topic name
- # @yieldparam [Integer] partition number
- # @yieldparam [Array<Rdkafka::Consumer::Message>] topic partition aggregated results
- def call(messages_buffer)
- messages_buffer.each do |topic, partitions|
- partitions.each do |partition, messages|
- yield(topic, partition, messages)
- end
+ # @param queue [Karafka::Processing::JobsQueue] queue where we want to put the jobs
+ # @param jobs_array [Array<Karafka::Processing::Jobs::Base>] jobs we want to schedule
+ def schedule_consumption(queue, jobs_array)
+ jobs_array.each do |job|
+ queue << job
end
end
+
+ # Both revocation and shutdown jobs can also run in fifo by default
+ alias schedule_revocation schedule_consumption
+ alias schedule_shutdown schedule_consumption
end
end