app/services/katello/agent/dispatcher.rb in katello-4.0.0 vs app/services/katello/agent/dispatcher.rb in katello-4.0.1
- old
+ new
@@ -12,39 +12,33 @@
register_message(:update_package, Katello::Agent::UpdatePackageMessage)
register_message(:install_errata, Katello::Agent::InstallErrataMessage)
register_message(:install_package_group, Katello::Agent::InstallPackageGroupMessage)
register_message(:remove_package_group, Katello::Agent::RemovePackageGroupMessage)
- def self.dispatch(message_type, host_ids, args)
+ def self.dispatch(message_type, histories, args)
message_class = @supported_messages[message_type]
+ fail("Unsupported message type: #{message_type}") unless message_class
- fail("Unsupported message type") unless message_class
-
- uuid_data = ::Katello::Host::ContentFacet.where(host_id: host_ids).pluck(:host_id, :uuid)
- fail("Couldn't find all hosts specified") unless host_ids.size == uuid_data.size
-
- host_data = uuid_data.map do |host_id, consumer_id|
- {
- host_id: host_id,
- consumer_id: consumer_id,
- history: Katello::Agent::DispatchHistory.new(host_id: host_id),
- message: message_class.new(**args.merge(consumer_id: consumer_id))
- }
+ messages = histories.map do |history|
+ message = message_class.new(**args.merge(consumer_id: history.host.subscription_facet.uuid))
+ message.dispatch_history_id = history.id
+ message.recipient_address = settings[:client_queue_format] % history.host.subscription_facet.uuid
+ message.reply_to = settings[:event_queue_name]
+ message
end
- histories = host_data.map { |attrs| attrs[:history] }
- ActiveRecord::Base.transaction do
- Katello::Agent::DispatchHistory.import(histories)
+ connection = Connection.new
+ connection.send_messages(messages)
- host_data.each do |d|
- d[:message].dispatch_history_id = d[:history].id
- d[:message].recipient_address = settings[:client_queue_format] % [d[:consumer_id]]
- d[:message].reply_to = settings[:event_queue_name]
- end
+ histories
+ end
- connection = Connection.new
- connection.send_messages(host_data.map { |d| d[:message] })
+ def self.create_histories(host_ids:)
+ histories = host_ids.map do |id|
+ Katello::Agent::DispatchHistory.new(host_id: id)
end
+
+ Katello::Agent::DispatchHistory.import(histories)
histories
end
def self.delete_client_queue(queue_name:)