Sha256: ddf84874af5015f0c48bf2d8745833f7719251a2220d47b806f40933b818d29a

Contents?: true

Size: 1.81 KB

Versions: 1

Compression:

Stored size: 1.81 KB

Contents

# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require 'java'
require 'logstash-output-iothub_jars.rb'

java_import "com.microsoft.azure.sdk.iot.device.DeviceClient"
java_import "com.microsoft.azure.sdk.iot.device.IotHubClientProtocol"
java_import "com.microsoft.azure.sdk.iot.device.Message"
java_import "com.microsoft.azure.sdk.iot.device.IotHubEventCallback"

$wait_queue = {}

class EventCallback
  include IotHubEventCallback
  def execute(status, context)
    $wait_queue.delete(context) if $wait_queue[context]
# puts("Callback #{context} : #{status.to_s}")
  end
end

class LogStash::Outputs::Iothub < LogStash::Outputs::Base
  config_name "iothub"

  config :connection_string, :validate => :string, :required => true

  # Todo: support for AMQPS.
  # config :sas_token_expiry_time_sec, :validte => :number, :default => 2400

  public
  def register
    # Todo: Get hang in an open with AMQPS.
    #protocol = IotHubClientProtocol::AMQPS

    protocol = IotHubClientProtocol::MQTT

    @client = DeviceClient.new(@connection_string, protocol)

    # Todo: use params for AMQPS protocol.
    # @client.setOption(
    #   "SetCertificatePath",
    #   "/Users/tac/Desktop/logstash-output-iothub/cert.crt")

    # @client.setOption(
    #   "SetSASTokenExpiryTime",
    #   @sas_token_expiry_time_sec)

    @client.open()
  end # def register


  public
  def receive(event)
    m = event.to_json
    $wait_queue[m] = true
    msg = Message.new(m)
    msg.setExpiryTime(3000)
    @client.sendEventAsync(
      msg,
      EventCallback.new, m)

    return "Event received"
  end # def receive

  public
  def close()
    begin
      # waiting callbacks for all sent messages.
      sleep 1 unless $wait_queue.empty?

      @client.close() if @client
    rescue => e
    end
  end
end # class LogStash::Outputs::Iothub

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
logstash-output-iothub-1.1.0 lib/logstash/outputs/iothub.rb