Sha256: ea7faf4d8e5b17be2ca046d432b3fea1db85ff9f504c01c5e3f8a1443ae769f1

Contents?: true

Size: 1.48 KB

Versions: 1

Compression:

Stored size: 1.48 KB

Contents

require 'socket'
require 'json'

module WhirledPeas
  module Frame
    class Producer
      LOGGER_ID = 'PRODUCER'

      def self.start(logger: NullLogger.new, host:, port:, &block)
        server = TCPServer.new(host, port)
        client = server.accept
        logger.info(LOGGER_ID) { "Connected to #{host}:#{port}" }
        producer = new(client, logger)
        yield producer
        logger.info(LOGGER_ID) { 'Exited normally' }
      rescue => e
        producer.terminate
        logger.warn(LOGGER_ID) { 'Exited with error' }
        logger.error(LOGGER_ID) { e }
        raise
      ensure
        if client
          logger.info(LOGGER_ID) { 'Closing connection'}
          client.close
        end
      end

      def initialize(client, logger=NullLogger.new)
        @client = client
        @logger = logger
        @queue = Queue.new
      end

      def send(name, duration: nil, args: {})
        client.puts(JSON.generate('name' => name, 'duration' => duration, **args))
        logger.debug(LOGGER_ID) { "Sending frame: #{name}" }
      end

      def enqueue(name, duration: nil, args: {})
        queue.push([name, duration, args])
      end

      def flush
        while !queue.empty?
          name, duration, args = queue.pop
          send(name, duration: duration, args: args)
        end
      end

      def stop
        send(Frame::EOF)
      end

      def terminate
        send(Frame::TERMINATE)
      end

      private

      attr_reader :client, :logger, :queue
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
whirled_peas-0.2.0 lib/whirled_peas/frame/producer.rb