lib/fluent/plugin/gcloud_pubsub/client.rb in fluent-plugin-gcloud-pubsub-custom-subscriber-1.3.2 vs lib/fluent/plugin/gcloud_pubsub/client.rb in fluent-plugin-gcloud-pubsub-custom-subscriber-1.3.3Z

- old
+ new

@@ -1,67 +1,48 @@ require 'google/cloud/pubsub' +require 'retryable' module Fluent module GcloudPubSub class Error < StandardError end class RetryableError < Error end - class Message - attr_reader :message, :attributes - def initialize(message, attributes={}) - @message = message - @attributes = attributes - end - - def bytesize() - attr_size = 0 - @attributes.each do |key, val| - attr_size += key.bytesize + val.bytesize - end - @message.bytesize + attr_size - end - end - class Publisher - def initialize(project, key, autocreate_topic) - @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key - @autocreate_topic = autocreate_topic - @topics = {} - end + RETRY_COUNT = 5 + RETRYABLE_ERRORS = [Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError] - def topic(topic_name) - return @topics[topic_name] if @topics.has_key? topic_name - - client = @pubsub.topic topic_name - if client.nil? && @autocreate_topic - client = @pubsub.create_topic topic_name + # autocreate_topic is unused + def initialize(project, key, topic_name, skip_lookup) + Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do + pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key + @client = pubsub.topic topic_name, skip_lookup: skip_lookup end - if client.nil? - raise Error.new "topic:#{topic_name} does not exist." - end - - @topics[topic_name] = client - client + raise Error.new "topic:#{topic_name} does not exist." if @client.nil? end - def publish(topic_name, messages) - topic(topic_name).publish do |batch| + def publish(messages) + @client.publish do |batch| messages.each do |m| - batch.publish m.message, m.attributes + batch.publish m end end rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}" end end class Subscriber + RETRY_COUNT = 5 + RETRYABLE_ERRORS = [Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError] + def initialize(project, key, topic_name, subscription_name) - pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key - topic = pubsub.topic topic_name - @client = topic.subscription subscription_name + Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do + pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key + topic = pubsub.topic topic_name + @client = topic.subscription subscription_name + end raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil? end def pull(immediate, max) @client.pull immediate: immediate, max: max