lib/fluent/plugin/gcloud_pubsub/client.rb in fluent-plugin-gcloud-pubsub-custom-subscriber-1.3.2 vs lib/fluent/plugin/gcloud_pubsub/client.rb in fluent-plugin-gcloud-pubsub-custom-subscriber-1.3.3Z
- old
+ new
@@ -1,67 +1,48 @@
require 'google/cloud/pubsub'
+require 'retryable'
module Fluent
module GcloudPubSub
class Error < StandardError
end
class RetryableError < Error
end
- class Message
- attr_reader :message, :attributes
- def initialize(message, attributes={})
- @message = message
- @attributes = attributes
- end
-
- def bytesize()
- attr_size = 0
- @attributes.each do |key, val|
- attr_size += key.bytesize + val.bytesize
- end
- @message.bytesize + attr_size
- end
- end
-
class Publisher
- def initialize(project, key, autocreate_topic)
- @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key
- @autocreate_topic = autocreate_topic
- @topics = {}
- end
+ RETRY_COUNT = 5
+ RETRYABLE_ERRORS = [Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError]
- def topic(topic_name)
- return @topics[topic_name] if @topics.has_key? topic_name
-
- client = @pubsub.topic topic_name
- if client.nil? && @autocreate_topic
- client = @pubsub.create_topic topic_name
+ # autocreate_topic is unused
+ def initialize(project, key, topic_name, skip_lookup)
+ Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do
+ pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key
+ @client = pubsub.topic topic_name, skip_lookup: skip_lookup
end
- if client.nil?
- raise Error.new "topic:#{topic_name} does not exist."
- end
-
- @topics[topic_name] = client
- client
+ raise Error.new "topic:#{topic_name} does not exist." if @client.nil?
end
- def publish(topic_name, messages)
- topic(topic_name).publish do |batch|
+ def publish(messages)
+ @client.publish do |batch|
messages.each do |m|
- batch.publish m.message, m.attributes
+ batch.publish m
end
end
rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}"
end
end
class Subscriber
+ RETRY_COUNT = 5
+ RETRYABLE_ERRORS = [Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError]
+
def initialize(project, key, topic_name, subscription_name)
- pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key
- topic = pubsub.topic topic_name
- @client = topic.subscription subscription_name
+ Retryable.retryable(tries: RETRY_COUNT, on: RETRYABLE_ERRORS) do
+ pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key
+ topic = pubsub.topic topic_name
+ @client = topic.subscription subscription_name
+ end
raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil?
end
def pull(immediate, max)
@client.pull immediate: immediate, max: max