Sha256: 62285d6acb1ac3f272ea8543bf852383f5f5c0f84d1c3cbc19abdff810c0d0ed

Contents?: true

Size: 1.09 KB

Versions: 2

Compression:

Stored size: 1.09 KB

Contents

module Kcl
  class ActionHandler
    def initialize record_processor, checkpointer, io_handler
      @record_processor = record_processor
      @checkpointer = checkpointer
      @io_handler = io_handler
    end

    # rubocop:disable Metrics/MethodLength,Metrics/AbcSize
    def handle action
      case action.fetch('action')
      when 'initialize'
        record_processor.init action.fetch('shardId')
      when 'processRecords'
        record_processor.process_records action.fetch('records'), checkpointer
      when 'shutdown'
        record_processor.shutdown checkpointer, action.fetch('reason')
      else
        fail MalformedActionError,
             "Received an action which couldn't be understood. Action was #{action}"
      end
    rescue KeyError => key_error
      raise MalformedActionError,
            "Action #{action} was expected to have key: #{key_error.message}"
    rescue => error
      io_handler.write_error error.backtrace.join "\n"
    end
    # rubocop:enable Metrics/MethodLength,Metrics/AbcSize

    private

    attr_reader :record_processor, :io_handler, :checkpointer
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
amazon-kinesis-client-ruby-0.0.3 lib/kcl/action_handler.rb
amazon-kinesis-client-ruby-0.0.1 lib/kcl/action_handler.rb