# encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "stud/interval" require "socket" # for Socket.gethostname require "time" require "azure" class LogStash::Inputs::Azuretable < LogStash::Inputs::Base class Interrupred < StandardError; end config_name "azuretable" # If undefined, Logstash will complain, even if codec is unused. default :codec, "plain" # The message string to use in the event. config :storage_sas_token, :validate => :string config :table_name, :validate => :string config :entity_count_to_process, :validate => :string, :default => 100 config :collection_start_time_utc, :validate => :string, :default => Time.now.utc.iso8601 config :etw_pretty_print, :validate => :boolean, :default => false config :idle_delay_seconds, :validate => :number, :default => 15 config :endpoint, :validate => :string, :default => "core.windows.net" # Default 1 minute delay to ensure all data is published to the table before querying. # See issue #23 for more: https://github.com/Azure/azure-diagnostics-tools/issues/23 config :data_latency_minutes, :validate => :number, :default => 1 public def register @host = Socket.gethostname Azure.configure do |config| config.storage_sas_token = @storage_sas_token end @azure_table_service = Azure::Table::TableService.new @last_timestamp = @collection_start_time_utc @idle_delay = @idle_delay_seconds @continuation_token = nil end # def register def run(queue) while !stop? @logger.debug("Starting process method @" + Time.now.to_s); process(output_queue) @logger.debug("Starting delay of: " + @idle_delay.to_s + " seconds @" + Time.now.to_s); sleep @idle_delay end # while end # def run def stop # nothing to do in this case so it is not necessary to define stop # examples of common "stop" tasks: # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end def build_latent_query @logger.debug("from #{@last_timestamp} to #{@until_timestamp}") query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{partitionkey_from_datetime(@until_timestamp)}')" for i in 0..99 query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@until_timestamp)}')" end # for block query_filter = query_filter.gsub('"','') query_filter end def build_zero_latency_query @logger.debug("from #{@last_timestamp} to most recent data") # query data using start_from_time query_filter = "(PartitionKey gt '#{partitionkey_from_datetime(@last_timestamp)}')" for i in 0..99 query_filter << " or (PartitionKey gt '#{i.to_s.rjust(19, '0')}___#{partitionkey_from_datetime(@last_timestamp)}' and PartitionKey lt '#{i.to_s.rjust(19, '0')}___9999999999999999999')" end # for block query_filter = query_filter.gsub('"','') query_filter end def process(output_queue) if @data_latency_minutes > 0 @until_timestamp = (Time.now - (60 * @data_latency_minutes)).iso8601 unless @continuation_token query_filter = build_latent_query else query_filter = build_zero_latency_query end @logger.debug("Query filter: " + query_filter) query = { :top => @entity_count_to_process, :filter => query_filter, :continuation_token => @continuation_token } result = @azure_table_service.query_entities(@table_name, query) @continuation_token = result.continuation_token if result and result.length > 0 @logger.debug("#{result.length} results found.") last_good_timestamp = nil result.each do |entity| event = LogStash::Event.new(entity.properties) event.set("type", @table_name) # Help pretty print etw files if (@etw_pretty_print && !event.get("EventMessage").nil? && !event.get("Message").nil?) @logger.debug("event: " + event.to_s) eventMessage = event.get("EventMessage").to_s message = event.get("Message").to_s @logger.debug("EventMessage: " + eventMessage) @logger.debug("Message: " + message) if (eventMessage.include? "%") @logger.debug("starting pretty print") toReplace = eventMessage.scan(/%\d+/) payload = message.scan(/(? e @logger.error("Oh My, An error occurred.", :exception => e) raise end # process # Windows Azure Diagnostic's algorithm for determining the partition key based on time is as follows: # 1. Take time in UTC without seconds. # 2. Convert it into .net ticks # 3. add a '0' prefix. def partitionkey_from_datetime(time_string) collection_time = Time.parse(time_string) if collection_time @logger.debug("collection time parsed successfully #{collection_time}") else raise(ArgumentError, "Could not parse the time_string") end # if else block collection_time -= collection_time.sec ticks = to_ticks(collection_time) "0#{ticks}" end # partitionkey_from_datetime # Convert time to ticks def to_ticks(time_to_convert) @logger.debug("Converting time to ticks") time_to_convert.to_i * 10000000 - TICKS_SINCE_EPOCH end # to_ticks end # class LogStash::Inputs::Azuretable