Sha256: f7dbe55a3ab799ef80683fefc1bced250d12dd3eedcab420b31bd4c42e8040a5

Contents?: true

Size: 1.05 KB

Versions: 9

Compression:

Stored size: 1.05 KB

Contents

require "em-jack"
require "logstash/inputs/base"
require "logstash/namespace"

class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base
  public
  def initialize(url, type, config={}, &block)
    super

    if @url.path == "" or @url.path == "/"
      raise "must specify a tube for beanstalk output"
    end
  end # def initialize

  public
  def register
    tube = @url.path[1..-1] # Skip leading '/'
    port = @url.port || 11300
    @beanstalk = EMJack::Connection.new(:host => @url.host,
                                        :port => port,
                                        :tube => tube)
    @beanstalk.each_job do |job|
      begin
        event = LogStash::Event.from_json(job.body)
      rescue => e
        @logger.warn(["Trouble parsing beanstalk job",
                     {:error => e.message, :body => job.body,
                      :backtrace => e.backtrace}])
        @beanstalk.bury(job, 0)
      end

      receive(event)
      @beanstalk.delete(job)
    end # @beanstalk.each_job
  end # def register
end # class LogStash::Inputs::Beanstalk

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
logstash-lite-0.2.20110505142231 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110422152244 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110405105201 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110331121236 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110329105411 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110206003603 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110203130400 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110122143801 lib/logstash/inputs/beanstalk.rb
logstash-lite-0.2.20110112115019 lib/logstash/inputs/beanstalk.rb