lib/logstash/outputs/analyticdb.rb in logstash-output-analyticdb-5.4.0.10 vs lib/logstash/outputs/analyticdb.rb in logstash-output-analyticdb-5.4.0.11
- old
+ new
@@ -215,12 +215,10 @@
def submit(events)
connection = nil
statement = nil
events_to_retry = []
- insert_sql = ""
- sql_len = 0
is_insert_err = false
begin
connection = @pool.getConnection
rescue => e
@@ -229,36 +227,61 @@
# We're not counting that towards our retry count.
return events, false
end
begin
- events.each do |event|
+ pos = 0
+ insert_sql = ""
+ @logger.debug("events size: #{events.size}")
+ while pos < events.size do
+ event = events[pos]
statement = connection.prepareStatement(
(@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0]
)
begin
statement = add_statement_event_params(statement, event) if @statement.length > 1
stmt_str = statement.toString
one_sql = stmt_str[stmt_str.index(": ") + 2, stmt_str.length]
- if sql_len + one_sql.length >= @commit_size
+ # on duplicate key start pos
+ on_duplicate_pos = one_sql.downcase.index(/on(\s+)duplicate/)
+ if on_duplicate_pos == nil
+ batch_insert_values_end_pos = one_sql.length
+ else
+ batch_insert_values_end_pos = on_duplicate_pos
+ end
+ @logger.debug("one_sql: #{one_sql}")
+ # trigger batch insert
+ if insert_sql.length + one_sql.length >= @commit_size
+ if insert_sql.length == 0
+ insert_sql = one_sql[0, batch_insert_values_end_pos]
+ end
+ if batch_insert_values_end_pos != one_sql.length
+ insert_sql.concat(one_sql[batch_insert_values_end_pos, one_sql.length - batch_insert_values_end_pos])
+ end
+ @logger.debug("batch 1 insert sql: #{insert_sql}")
statement.execute(insert_sql)
- sql_len = 0
insert_sql = ""
end
- if sql_len == 0
- insert_sql = one_sql
- sql_len = one_sql.length
+ if insert_sql.length == 0
+ insert_sql = one_sql[0, batch_insert_values_end_pos]
else
- insert_sql.concat(",").concat(one_sql[@pre_len, one_sql.length])
- sql_len = sql_len + one_sql.length - @pre_len
+ insert_sql = insert_sql.rstrip
+ insert_sql.concat(", ").concat(one_sql[@pre_len, batch_insert_values_end_pos - @pre_len])
end
+ # loop to end
+ if pos == events.size - 1
+ if batch_insert_values_end_pos != one_sql.length
+ insert_sql.concat(one_sql[batch_insert_values_end_pos, one_sql.length - batch_insert_values_end_pos])
+ end
+ @logger.debug("batch 2 insert sql: #{insert_sql}")
+ statement.execute(insert_sql)
+ end
+ pos += 1
rescue => e
retry_exception?(e, event.to_json())
- is_insert_err = true
end
end
- statement.execute(insert_sql)
rescue => e
@logger.error("Submit data error, sql is #{insert_sql}, error is #{e}")
is_insert_err = true
ensure
statement.close unless statement.nil?
@@ -311,12 +334,11 @@
if attempts > @max_flush_exceptions
if (@skip_exception)
@logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being skipped. See previously logged exceptions for details.")
break
end
- raise "JDBC - max_flush_exceptions #{max_flush_exceptions} has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details."
- break
+ raise "JDBC - max_flush_exceptions #{max_flush_exceptions} has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are crashed. See previously logged exceptions for details."
end
end
# If we're retrying the action sleep for the recommended interval
# Double the interval for the next time through to achieve exponential backoff
@@ -407,6 +429,6 @@
def next_sleep_interval(current_interval)
doubled = current_interval * 2
doubled > @retry_max_interval ? @retry_max_interval : doubled
end
-end # class LogStash::Outputs::analyticdb
+end # class LogStash::Outputs::analyticdb
\ No newline at end of file