Sha256: 31c5f2051f9181d08903e9550fa0d8ed7b622e7ad7aec00f65336421304f696e

Contents?: true

Size: 1.91 KB

Versions: 1

Compression:

Stored size: 1.91 KB

Contents

require 'google/cloud/pubsub'

module Fluent
  module GcloudPubSub
    class Error < StandardError
    end
    class RetryableError < Error
    end

    class Publisher
      def initialize(project, key, topic_name, autocreate_topic)
        pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key
        @client = pubsub.topic topic_name
        if @client.nil?
          if autocreate_topic
            @client = pubsub.create_topic topic_name
          else
            raise Error.new "topic:#{topic_name} does not exist."
          end
        end
      end

      def publish(messages)
        @client.publish do |batch|
          messages.each do |m|
            batch.publish m
          end
        end
      rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
        raise RetryableError.new "Google api returns error:#{ex.class.to_s} message:#{ex.to_s}"
      end
    end

    class Subscriber
      def initialize(project, key, topic_name, subscription_name)
        pubsub = Google::Cloud::Pubsub.new project: project, keyfile: key
        topic = pubsub.topic topic_name
        @client = topic.subscription subscription_name
        raise Error.new "subscription:#{subscription_name} does not exist." if @client.nil?
      end

      def pull(immediate, max)
        @client.pull immediate: immediate, max: max
      rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
        raise RetryableError.new "Google pull api returns error:#{ex.class.to_s} message:#{ex.to_s}"
      end

      def acknowledge(messages)
        @client.acknowledge messages
      rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => ex
        raise RetryableError.new "Google acknowledge api returns error:#{ex.class.to_s} message:#{ex.to_s}"
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-gcloud-pubsub-custom-1.0.2 lib/fluent/plugin/gcloud_pubsub/client.rb