require 'fluent/plugin/input' require 'bigdecimal' module Fluent::Plugin class DynamoDBStreamsInput < Input Fluent::Plugin.register_input('dynamodb_streams', self) helpers :timer def initialize super end config_param :tag, :string config_param :aws_key_id, :string, :default => nil, :secret => true config_param :aws_sec_key, :string, :default => nil, :secret => true config_param :aws_region, :string, :default => "ap-northeast-1" config_param :stream_arn, :string config_param :fetch_interval, :time, :default => 1 config_param :fetch_size, :integer, :default => 1 config_param :pos_file, :string, :default => nil def configure(conf) super if @aws_region == "ddblocal" @aws_region = "ap-northeast-1" # dummy settings @stream_endpoint = "http://localhost:8000" else @stream_endpoint = "https://streams.dynamodb.#{@aws_region}.amazonaws.com" end unless @pos_file log.warn "dynamodb-streams: 'pos_file PATH' parameter is not set to a 'dynamodb-streams' source." log.warn "dynamodb-streams: this parameter is highly recommended to save the position to resume." end end def start super unless @pos_file @pos_memory = {} end options = {} options[:region] = @aws_region if @aws_region options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key options[:endpoint] = @stream_endpoint @client = Aws::DynamoDBStreams::Client.new(options) @iterator = {} timer_execute(:in_dynamodb_streams_timer, @fetch_interval, &method(:run)) end def shutdown super end def run get_shards.each do |s| if s.sequence_number_range.ending_sequence_number remove_sequence(s.shard_id) next end set_iterator(s.shard_id) unless @iterator.key? s.shard_id resp = @client.get_records({ shard_iterator: @iterator[s.shard_id], limit: @fetch_size, }) resp.records.each do |r| begin emit(r) rescue => e log.error "dynamodb-streams: error has occoured.", error: e.message, error_class: e.class end save_sequence(s.shard_id, r.dynamodb.sequence_number) end if resp.next_shard_iterator @iterator[s.shard_id] = resp.next_shard_iterator else @iterator.delete s.shard_id end end end def get_shards() shards = [] last_shard_id = nil begin s = @client.describe_stream({ stream_arn: @stream_arn, exclusive_start_shard_id: last_shard_id, }).stream_description shards = shards + s.shards if s.last_evaluated_shard_id == last_shard_id then break end last_shard_id = s.last_evaluated_shard_id end while last_shard_id shards end def set_iterator(shard_id) if load_sequence(shard_id) @iterator[shard_id] = @client.get_shard_iterator({ stream_arn: @stream_arn, shard_id: shard_id, shard_iterator_type: "AFTER_SEQUENCE_NUMBER", sequence_number: load_sequence(shard_id), }).shard_iterator else @iterator[shard_id] = @client.get_shard_iterator({ stream_arn: @stream_arn, shard_id: shard_id, shard_iterator_type: "TRIM_HORIZON", }).shard_iterator end end def load_sequence(shard_id) if @pos_file return nil unless File.exist?("#{@pos_file}.#{shard_id}") File.read("#{@pos_file}.#{shard_id}").chomp else return nil unless @pos_memory[shard_id] @pos_memory[shard_id] end end def save_sequence(shard_id, sequence) if @pos_file open("#{@pos_file}.#{shard_id}", 'w') do |f| f.write sequence end else @pos_memory[shard_id] = sequence end sequence end def remove_sequence(shard_id) if @pos_file return unless File.exist?("#{@pos_file}.#{shard_id}") File.unlink("#{@pos_file}.#{shard_id}") else @pos_memory[shard_id] = nil end end def emit(r) record = { "aws_region" => r.aws_region, "event_source" => r.event_source, "event_version" => r.event_version, "event_id" => r.event_id, "event_name" => r.event_name, "dynamodb" => { "stream_view_type" => r.dynamodb.stream_view_type, "sequence_number" => r.dynamodb.sequence_number, "size_bytes" => r.dynamodb.size_bytes, } } record["dynamodb"]["keys"] = dynamodb_to_hash(r.dynamodb.keys) if r.dynamodb.keys record["dynamodb"]["old_image"] = dynamodb_to_hash(r.dynamodb.old_image) if r.dynamodb.old_image record["dynamodb"]["new_image"] = dynamodb_to_hash(r.dynamodb.new_image) if r.dynamodb.new_image router.emit(@tag, Fluent::Engine.now, record) end def dynamodb_to_hash(hash) hash.each do |k, v| # delete binary attributes if v.b || v.bs hash.delete(k) else hash[k] = format_attribute_value(v) end end return hash end def format_attribute_value(v) if v.m return dynamodb_to_hash(v.m) elsif v.l return v.l.map {|i| format_attribute_value(i) } elsif v.ns return v.ns.map {|i| BigDecimal.new(i).to_i } elsif v.ss return v.ss elsif v.null return null elsif v.bool return v.bool elsif v.n return BigDecimal.new(v.n).to_i elsif v.s return v.s else log.warn "dynamodb-streams: unknown attribute value." end end end end