Sha256: 026cf95343bde51d260ad671cdd125bbb9a1a49b15e835afff6a72a29a39d311
Contents?: true
Size: 1.08 KB
Versions: 5
Compression:
Stored size: 1.08 KB
Contents
require 'google/cloud' module Fluent module GcloudPubSub class Error < StandardError; end class Publisher def initialize(project, key, topic, autocreate_topic) pubsub = (Google::Cloud.new project, key).pubsub @client = pubsub.topic topic, autocreate: autocreate_topic raise Fluent::GcloudPubSub::Error.new "topic:#{topic} does not exist." if @client.nil? end def publish(messages) @client.publish do |batch| messages.each do |m| batch.publish m end end end end class Subscriber def initialize(project, key, topic, subscription) pubsub = (Google::Cloud.new project, key).pubsub topic = pubsub.topic topic @client = topic.subscription subscription raise Fluent::GcloudPubSub::Error.new "subscription:#{subscription} does not exist." if @client.nil? end def pull(immediate, max) @client.pull immediate: immediate, max: max end def acknowledge(messages) @client.acknowledge messages end end end end
Version data entries
5 entries across 5 versions & 1 rubygems