lib/logstash/outputs/analyticdb.rb in logstash-output-analyticdb-5.4.0.10 vs lib/logstash/outputs/analyticdb.rb in logstash-output-analyticdb-5.4.0.11

- old
+ new

@@ -215,12 +215,10 @@ def submit(events) connection = nil statement = nil events_to_retry = [] - insert_sql = "" - sql_len = 0 is_insert_err = false begin connection = @pool.getConnection rescue => e @@ -229,36 +227,61 @@ # We're not counting that towards our retry count. return events, false end begin - events.each do |event| + pos = 0 + insert_sql = "" + @logger.debug("events size: #{events.size}") + while pos < events.size do + event = events[pos] statement = connection.prepareStatement( (@unsafe_statement == true) ? event.sprintf(@statement[0]) : @statement[0] ) begin statement = add_statement_event_params(statement, event) if @statement.length > 1 stmt_str = statement.toString one_sql = stmt_str[stmt_str.index(": ") + 2, stmt_str.length] - if sql_len + one_sql.length >= @commit_size + # on duplicate key start pos + on_duplicate_pos = one_sql.downcase.index(/on(\s+)duplicate/) + if on_duplicate_pos == nil + batch_insert_values_end_pos = one_sql.length + else + batch_insert_values_end_pos = on_duplicate_pos + end + @logger.debug("one_sql: #{one_sql}") + # trigger batch insert + if insert_sql.length + one_sql.length >= @commit_size + if insert_sql.length == 0 + insert_sql = one_sql[0, batch_insert_values_end_pos] + end + if batch_insert_values_end_pos != one_sql.length + insert_sql.concat(one_sql[batch_insert_values_end_pos, one_sql.length - batch_insert_values_end_pos]) + end + @logger.debug("batch 1 insert sql: #{insert_sql}") statement.execute(insert_sql) - sql_len = 0 insert_sql = "" end - if sql_len == 0 - insert_sql = one_sql - sql_len = one_sql.length + if insert_sql.length == 0 + insert_sql = one_sql[0, batch_insert_values_end_pos] else - insert_sql.concat(",").concat(one_sql[@pre_len, one_sql.length]) - sql_len = sql_len + one_sql.length - @pre_len + insert_sql = insert_sql.rstrip + insert_sql.concat(", ").concat(one_sql[@pre_len, batch_insert_values_end_pos - @pre_len]) end + # loop to end + if pos == events.size - 1 + if batch_insert_values_end_pos != one_sql.length + insert_sql.concat(one_sql[batch_insert_values_end_pos, one_sql.length - batch_insert_values_end_pos]) + end + @logger.debug("batch 2 insert sql: #{insert_sql}") + statement.execute(insert_sql) + end + pos += 1 rescue => e retry_exception?(e, event.to_json()) - is_insert_err = true end end - statement.execute(insert_sql) rescue => e @logger.error("Submit data error, sql is #{insert_sql}, error is #{e}") is_insert_err = true ensure statement.close unless statement.nil? @@ -311,12 +334,11 @@ if attempts > @max_flush_exceptions if (@skip_exception) @logger.error("JDBC - max_flush_exceptions has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being skipped. See previously logged exceptions for details.") break end - raise "JDBC - max_flush_exceptions #{max_flush_exceptions} has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are being dropped. See previously logged exceptions for details." - break + raise "JDBC - max_flush_exceptions #{max_flush_exceptions} has been reached. #{submit_actions.length} events have been unable to be sent to SQL and are crashed. See previously logged exceptions for details." end end # If we're retrying the action sleep for the recommended interval # Double the interval for the next time through to achieve exponential backoff @@ -407,6 +429,6 @@ def next_sleep_interval(current_interval) doubled = current_interval * 2 doubled > @retry_max_interval ? @retry_max_interval : doubled end -end # class LogStash::Outputs::analyticdb +end # class LogStash::Outputs::analyticdb \ No newline at end of file