require 'flydata-core/thread_context' require 'flydata-core/string_utils' module FlydataCore # Error 12xx are derived from AWS. # Error 100xx are FlyData errros. module ErrorCode VALUE_OUT_OF_RANGE_ERROR = 1204 INVALID_DATE_FORMAT_ERROR = 1205 INVALID_TIMESTAMP_FORMAT_ERROR = 1206 INVALID_NUMBER_ERROR = 1207 INVALID_FLOAT_ERROR = 1208 INVALID_DECIMAL_ERROR = 1209 INVALID_BOOLEAN_ERROR = 1210 NOT_NULL_VIOLATION_ERROR = 1213 INVALID_VARCHAR_ERROR = 1214 INVALID_CHAR_ERROR = 1215 INVALID_UTF8_CHARACTER_ERROR = 1220 INVALID_DATA_FORMAT_ERROR = 1201 MORE_COLUMNS_ERROR = 1202 LESS_COLUMNS_ERROR = 1203 NO_DATA_ERROR = 1211 RESERVED_TABLE_NAME_ERROR = 10001 RESERVED_COLUMN_NAME_ERROR = 10002 TABLE_MISSING_ERROR = 10003 AUTHENTICATION_ERROR = 10004 RETRY_LIMIT_EXCEED_ERROR = 10005 INFORMATION_SCHEMA_ERROR = 10006 TABLE_REVISION_ERROR = 10007 S3_ACCESS_ERROR = 10008 REDSHIFT_ACCESS_ERROR = 10009 BREAKING_ALTER_TABLE_ERROR = 10010 BREAKING_INFORMATION_SCHEMA_ERROR = 10011 INTERNAL_ERROR = 1200 end # Error Level # 0 Emergency: system is unusable # 1 Alert: action must be taken immediately # 2 Critical: critical conditions # 3 Error: error conditions # 4 Warning: warning conditions # 5 Notice: normal but significant condition # 6 Informational: informational messages # 7 Debug: debug-level messages module ErrorLevel ERROR = 3 # failed to load a record WARN = 4 # loaded a record but a record is modified end # RetryableError class RetryableError < StandardError NO_RETRY_LIMIT = :no_retry_limit def initialize(original_exception, retry_alert_limit = nil) @original_exception = original_exception @retry_alert_limit = retry_alert_limit end attr_reader :original_exception attr_reader :retry_alert_limit end # UserMaintenanceError class UserMaintenanceError < StandardError end # Errors usded by including module BreakingSyncError end # An error that indicates that sync record was received for a table whose sync is broken # These records cannot be retried, should be ignored class BrokenTableSyncError < StandardError end # FlyDataError # data_node_id, data_entry_id, chunk_identifier, table_name, err_code, err_reason, err_level class FlydataError < StandardError def initialize(error_reason, error_content = {}) error_content = if error_reason.kind_of?(String) { err_reason: error_reason}.merge(error_content) elsif error_reason.kind_of?(Hash) error_reason.merge(error_content) else raise ArgumentError.new( "Invalid argument. First parameter must be string or hash object.") end @error_content = default_params. merge(context_params). merge(error_content). merge(overwrite_params) filter_content(@error_content) @error_content[:err_code] = err_code end attr_reader :error_content def default_params if h = FlydataCore::ThreadContext.parameters {data_node_id: h[:node_id], data_entry_id: h[:data_entry_id], table_name: h[:table_name], chunk_identifier: h[:chunk_identifier], err_level: ErrorLevel::ERROR} else {} end end def context_params DataDeliveryErrorThreadContext.context_params || {} end def overwrite_params {} end def err_code raise NoMethodError, "default_parameters is not implemented" end def warn? err_level == ErrorLevel::WARN end def err_level @error_content[:err_level] end def to_s if r = @error_content[:err_reason] r else super end end MAX_CONTENT_SIZE = 1024 # 1kb def filter_content(error_content = @error_content) error_content.each do |k, value| if value.kind_of?(String) if value.length > MAX_CONTENT_SIZE value = error_content[k] = value[0..MAX_CONTENT_SIZE-1] end error_content[k] = FlydataCore::StringUtils.replace_invalid_utf8_char(value) end end end end # DataDeliveryError # An error which results in data clog or loss of information. # This type of error gets reported to customer as data delivery error. # data_node_id, data_entry_id, chunk_identifier, table_name, err_code, err_reason, err_level class DataDeliveryError < FlydataError end # + RecordDeliveryError # data_node_id, data_entry_id, chunk_identifier, table_name, err_code, err_reason, err_level, record_no, raw_record class RecordDeliveryError < DataDeliveryError def to_s "#{super} raw_record=(#{@error_content[:raw_record]})" end end class InvalidDataFormatError < RecordDeliveryError def err_code; ErrorCode::INVALID_DATA_FORMAT_ERROR; end end class MoreColumnsError < RecordDeliveryError def err_code; ErrorCode::MORE_COLUMNS_ERROR; end end class LessColumnsError < RecordDeliveryError def err_code; ErrorCode::LESS_COLUMNS_ERROR; end end class NoDataError < RecordDeliveryError def err_code; ErrorCode::NO_DATA_ERROR; end end class ReservedTableNameError < RecordDeliveryError def err_code; ErrorCode::RESERVED_TABLE_NAME_ERROR; end end class ReservedColumnNameError < RecordDeliveryError def err_code; ErrorCode::RESERVED_COLUMN_NAME_ERROR; end end class TableMissingError < RecordDeliveryError def err_code; ErrorCode::TABLE_MISSING_ERROR; end end class RecordDeliveryInternalError < RecordDeliveryError def err_code; ErrorCode::INTERNAL_ERROR; end end # information schema, especially flydata_ctl_columns table, is broken class InformationSchemaError < RecordDeliveryError def err_code; ErrorCode::INFORMATION_SCHEMA_ERROR; end end # No table information found in the database class MissingTableInformationError < InformationSchemaError def err_code; ErrorCode::INFORMATION_SCHEMA_ERROR; end end # Table's revision does not match the record's revision. class TableRevisionError < RecordDeliveryError def err_code; ErrorCode::TABLE_REVISION_ERROR; end end # Table's revision is lower than the record's revision. Should catch up soon. class StaleTableRevisionError < FlydataError def err_code; ErrorCode::TABLE_REVISION_ERROR; end end # Unsupported ALTER TABLE which breaks the tables sync consistency. class BreakingAlterTableError < RecordDeliveryError include BreakingSyncError def err_code; ErrorCode::BREAKING_ALTER_TABLE_ERROR; end end # Unrecoverable information schema error which requires re-synchronize a table class BreakingInformationSchemaError < InformationSchemaError include BreakingSyncError def initialize(*args) super @error_content[:err_reason] = "Sync is broken. The table needs to be re-synchronized. (#{@error_content[:err_reason]})" end def err_code; ErrorCode::BREAKING_INFORMATION_SCHEMA_ERROR; end end # + BadValueError # data_node_id, data_entry_id, chunk_identifier, table_name, err_code, err_reason, err_level, record_no, raw_record, colname, type, raw_value, modified_value class BadValueError < RecordDeliveryError def modified_value @error_content[:modified_value] end def has_modified_value? @error_content.has_key?(:modified_value) end def to_s "#{super} column=(#{@error_content[:col_name]}) value=(#{@error_content[:raw_value]})" end end class ValueOutOfRangeError < BadValueError def err_code; ErrorCode::VALUE_OUT_OF_RANGE_ERROR; end end class InvalidDateFormatError < BadValueError def err_code; ErrorCode::INVALID_DATE_FORMAT_ERROR; end end class InvalidTimestampFormatError < BadValueError def err_code; ErrorCode::INVALID_TIMESTAMP_FORMAT_ERROR; end end class InvalidNumberError < BadValueError def err_code; ErrorCode::INVALID_NUMBER_ERROR; end end class InvalidFloatError < BadValueError def err_code; ErrorCode::INVALID_FLOAT_ERROR; end end class InvalidDecimalError < BadValueError def err_code; ErrorCode::INVALID_DECIMAL_ERROR; end end class InvalidBooleanError < BadValueError def err_code; ErrorCode::INVALID_BOOLEAN_ERROR; end end class NotNullViolationError < BadValueError def err_code; ErrorCode::NOT_NULL_VIOLATION_ERROR; end end class InvalidVarcharError < BadValueError def err_code; ErrorCode::INVALID_VARCHAR_ERROR; end end class InvalidCharError < BadValueError def err_code; ErrorCode::INVALID_CHAR_ERROR; end end class InvalidUtf8CharacterError < BadValueError def err_code; ErrorCode::INVALID_UTF8_CHARACTER_ERROR; end end # + ChunkDeliveryError # data_node_id, data_entry_id, chunk_identifier, table_name, err_code, err_reason, err_level class ChunkDeliveryError < DataDeliveryError end class AuthenticationError < ChunkDeliveryError # end_point_url def err_code; ErrorCode::AUTHENTICATION_ERROR; end end class RetryLimitExceedError < ChunkDeliveryError def err_code; ErrorCode::RETRY_LIMIT_EXCEED_ERROR; end end class ChunkDeliveryInternalError < ChunkDeliveryError def err_code; ErrorCode::INTERNAL_ERROR; end end class UserResourceError < ChunkDeliveryError def default_params super.merge(timestamp: Time.now.utc) end def overwrite_params super.merge(table_name: nil) end end class S3AccessError < UserResourceError def err_code; ErrorCode::S3_ACCESS_ERROR; end end class RedshiftAccessError < UserResourceError def err_code; ErrorCode::REDSHIFT_ACCESS_ERROR; end end # error_content[:errors] has a list of errors class MultipleErrors < DataDeliveryError def err_code; -1; end end ## Depricated errors # The record is valid but not supported. class UnsupportedRecordFormat < StandardError; end # Table Def Error class TableDefError < StandardError attr_reader :err_hash def initialize(err_hash) super(err_hash[:error]) @err_hash = err_hash end end ## Compatibility check error class CompatibilityError < StandardError end class AgentCompatibilityError < CompatibilityError end class MysqlCompatibilityError < CompatibilityError end class PostgresqlCompatibilityError < CompatibilityError end ## Error container class DataDeliveryErrorThreadContext THREAD_LOCAL_KEY_ERROR_LIST = :flydata_data_delivery_error_list THREAD_LOCAL_KEY_RECORD_LIMIT = "#{THREAD_LOCAL_KEY_ERROR_LIST}_limit".to_sym DEFAULT_RECORD_LIMIT = 10 THREAD_LOCAL_KEY_CONTEXT_PARAMS = "#{THREAD_LOCAL_KEY_ERROR_LIST}_context_params".to_sym def self.initialize(record_limit = DEFAULT_RECORD_LIMIT) reset Thread.current[THREAD_LOCAL_KEY_RECORD_LIMIT] = record_limit end def self.add_error(error) return if error_list.include?(error) if error_list.count < record_limit error_list << error end #TODO: write warning log if record limit end def self.reset Thread.current[THREAD_LOCAL_KEY_ERROR_LIST] = nil Thread.current[THREAD_LOCAL_KEY_RECORD_LIMIT] = nil Thread.current[THREAD_LOCAL_KEY_CONTEXT_PARAMS] = nil end def self.record_limit Thread.current[THREAD_LOCAL_KEY_RECORD_LIMIT] || DEFAULT_RECORD_LIMIT end def self.error_list unless list = Thread.current[THREAD_LOCAL_KEY_ERROR_LIST] list = Thread.current[THREAD_LOCAL_KEY_ERROR_LIST] = [] end list end def self.set_context_params(params) Thread.current[THREAD_LOCAL_KEY_CONTEXT_PARAMS] = params end def self.add_context_params(params) cur_param = Thread.current[THREAD_LOCAL_KEY_CONTEXT_PARAMS] || {} Thread.current[THREAD_LOCAL_KEY_CONTEXT_PARAMS] = cur_param.merge(params) end def self.context_params Thread.current[THREAD_LOCAL_KEY_CONTEXT_PARAMS] end end end