require 'flydata-core/record/record' module Flydata module PluginSupport module SyncRecordEmittable TABLE_NAME = :table_name # A Flydata JSON tag to specify a table name TYPE = :type SEQ = :seq RESPECT_ORDER = :respect_order SRC_POS = :src_pos TABLE_REV = :table_rev V = :v # FlyData record format version attr_accessor :context # required # Public Interface: Emit sync records to fluent engine # # "records" : A record or records for emitting # Each record needs to be Hash # "options" # type: : (required) type (insert, update, delete) # tag : (optional) tag (default: @context.tag) # timestamp : (optional) timestamp (default: current timestamp) # src_pos : (required) source position (used for sync:repair) # table : (optional) table name # increment_table_rev : (optional) set true when incrementing table revision def emit_sync_records(records, options) return if records.nil? || records.empty? # skip records = [records] unless records.kind_of?(Array) # Check options tag = options[:tag] || @context.tag timestamp = options[:timestamp] || Time.now.to_i type = options[:type] raise "type option must be set" if type.to_s.empty? src_pos = options[:src_pos] raise "src_pos option must be set" if src_pos.to_s.empty? seq = nil if table = options[:table] table_rev = @context.table_revs[table] if options[:increment_table_rev] table_rev = @context.sync_fm.increment_table_rev(table, table_rev) @context.table_revs[table] = table_rev end seq = @context.sync_fm.get_table_position(table) end # Add common information to each record array = records.collect do |r| r[TYPE] = type r[RESPECT_ORDER] = true r[SRC_POS] = src_pos r[V] = FlydataCore::Record::V2 if table seq = @context.sync_fm.increment_table_position(seq) r[SEQ] = seq r[TABLE_NAME] = table r[TABLE_REV] = table_rev end [timestamp, r] end Fluent::Engine.emit_array(tag, array) if table @context.sync_fm.save_table_position(table, seq) if options[:set_infinity_to_table_binlog_pos] @context.set_infinity_to_table_binlog_pos(table) end end end end end end