Sha256: ebcf28ec27dd7f05bf634458564fdc736544ce5f97238ed58b98eeb061bdc161
Contents?: true
Size: 1.63 KB
Versions: 2
Compression:
Stored size: 1.63 KB
Contents
# encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "azure" # Reads events from Azure topics class LogStash::Inputs::Azuretopic < LogStash::Inputs::Base class Interrupted < StandardError; end config_name "azuretopic" milestone 1 default :codec, "json" config :namespace, :validate => :string config :access_key, :validate => :string config :subscription, :validate => :string config :topic, :validate => :string config :deliverycount, :validate => :number, :default => 10 def initialize(*args) super(*args) end # def initialize public def register Azure.configure do |config| config.sb_namespace = @namespace config.sb_access_key = @access_key end @azure_service_bus = Azure::ServiceBus::ServiceBusService.new end # def register def process(output_queue) message = @azure_service_bus.receive_subscription_message(@topic ,@subscription, { :peek_lock => true, :timeout => 1 } ) if message codec.decode(message.body) do |event| decorate(event) output_queue << event end # codec.decode @azure_service_bus.delete_subscription_message(message) end rescue LogStash::ShutdownSignal => e raise e rescue => e @logger.error("Oh My, An error occurred.", :exception => e) if message and message.delivery_count > @deliverycount @azure_service_bus.delete_subscription_message(message) end end # def process public def run(output_queue) while !stop? process(output_queue) end # loop end # def run public def teardown end # def teardown end # class LogStash::Inputs::Azuretopic
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
logstash-input-azuretopic-0.9.7 | lib/logstash/inputs/azuretopic.rb |
logstash-input-azuretopic-0.9.6 | lib/logstash/inputs/azuretopic.rb |