Sha256: 16678e075fac27fa0be76e4e792a0782094c385f0e1d8c46ae904c88b9faf27b

Contents?: true

Size: 1.54 KB

Versions: 6

Compression:

Stored size: 1.54 KB

Contents

require 'json'

require 'deep_merge'


module Franz
  module Output

    # STDOUT output for Franz.
    class Device

      # Start a new output in the background. We'll consume from the input queue
      # and ship events to STDOUT.
      #
      # @param [Hash] opts options for the output
      # @option opts [Queue] :input ([]) "input" queue
      def initialize opts={}
        opts = {
          logger: Logger.new(STDOUT),
          tags: [],
          input: [],
          output: '/dev/stdout'
        }.deep_merge!(opts)

        @statz = opts[:statz] || Franz::Stats.new
        @statz.create :num_output, 0

        @device = File.open(opts[:output], 'w')
        @logger = opts[:logger]

        @stop = false
        @foreground = opts[:foreground]

        @thread = Thread.new do
          until @stop
            event = opts[:input].shift

            log.trace \
              event: 'publish',
              raw: event

            @device.puts JSON::generate(event)

            @statz.inc :num_output
          end
        end

        log.info event: 'output started'

        @thread.join if @foreground
      end

      # Join the Output thread. Effectively only once.
      def join
        return if @foreground
        @foreground = true
        @thread.join
      end

      # Stop the Output thread. Effectively only once.
      def stop
        return if @foreground
        @foreground = true
        @thread.kill
        @device.close
        log.info event: 'output stopped'
      end

    private
      def log ; @logger end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
franz-2.1.11 lib/franz/output/device.rb
franz-2.1.10 lib/franz/output/device.rb
franz-2.1.9 lib/franz/output/device.rb
franz-2.1.8 lib/franz/output/device.rb
franz-2.1.7 lib/franz/output/device.rb
franz-2.1.6 lib/franz/output/device.rb