Sha256: 026cf95343bde51d260ad671cdd125bbb9a1a49b15e835afff6a72a29a39d311

Contents?: true

Size: 1.08 KB

Versions: 5

Compression:

Stored size: 1.08 KB

Contents

require 'google/cloud'

module Fluent
  module GcloudPubSub
    class Error < StandardError; end

    class Publisher
      def initialize(project, key, topic, autocreate_topic)
        pubsub = (Google::Cloud.new project, key).pubsub

        @client = pubsub.topic topic, autocreate: autocreate_topic
        raise Fluent::GcloudPubSub::Error.new "topic:#{topic} does not exist." if @client.nil?
      end

      def publish(messages)
        @client.publish do |batch|
          messages.each do |m|
            batch.publish m
          end
        end
      end
    end

    class Subscriber
      def initialize(project, key, topic, subscription)
        pubsub = (Google::Cloud.new project, key).pubsub
        topic = pubsub.topic topic
        @client = topic.subscription subscription
        raise Fluent::GcloudPubSub::Error.new "subscription:#{subscription} does not exist." if @client.nil?
      end

      def pull(immediate, max)
        @client.pull immediate: immediate, max: max
      end

      def acknowledge(messages)
        @client.acknowledge messages
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
fluent-plugin-gcloud-pubsub-custom-0.2.0 lib/fluent/plugin/gcloud_pubsub/client.rb
fluent-plugin-gcloud-pubsub-custom-0.1.4 lib/fluent/plugin/gcloud_pubsub/client.rb
fluent-plugin-gcloud-pubsub-custom-0.1.3 lib/fluent/plugin/gcloud_pubsub/client.rb
fluent-plugin-gcloud-pubsub-custom-0.1.2 lib/fluent/plugin/gcloud_pubsub/client.rb
fluent-plugin-gcloud-pubsub-custom-0.1.1 lib/fluent/plugin/gcloud_pubsub/client.rb