lib/idempo.rb in idempo-0.1.3 vs lib/idempo.rb in idempo-0.2.0

- old
+ new

@@ -6,76 +6,71 @@ require 'digest' require 'json' require 'measurometer' require_relative "idempo/version" +require_relative "idempo/request_fingerprint" require_relative "idempo/memory_backend" require_relative "idempo/redis_backend" require_relative "idempo/active_record_backend" +require_relative "idempo/malformed_key_error_app" +require_relative "idempo/concurrent_request_error_app" class Idempo - DEFAULT_TTL = 30 + DEFAULT_TTL_SECONDS = 30 SAVED_RESPONSE_BODY_SIZE_LIMIT = 4 * 1024 * 1024 class Error < StandardError; end class ConcurrentRequest < Error; end class MalformedIdempotencyKey < Error; end - def initialize(app, backend: MemoryBackend.new) + def initialize(app, backend: MemoryBackend.new, malformed_key_error_app: MalformedKeyErrorApp, compute_fingerprint_via: RequestFingerprint, concurrent_request_error_app: ConcurrentRequestErrorApp, persist_for_seconds: DEFAULT_TTL_SECONDS) @backend = backend @app = app + @concurrent_request_error_app = concurrent_request_error_app + @malformed_key_error_app = malformed_key_error_app + @fingerprint_calculator = compute_fingerprint_via + @persist_for_seconds = persist_for_seconds.to_i end def call(env) req = Rack::Request.new(env) return @app.call(env) if request_verb_idempotent?(req) return @app.call(env) unless idempotency_key_header = extract_idempotency_key_from(env) # The RFC requires that the Idempotency-Key header value is enclosed in quotes - idempotency_key_header = unquote(idempotency_key_header) - raise MalformedIdempotencyKey if idempotency_key_header == '' + idempotency_key_header_value = unquote(idempotency_key_header) + raise MalformedIdempotencyKey if idempotency_key_header_value == '' - fingerprint = compute_request_fingerprint(req) - request_key = "#{idempotency_key_header}_#{fingerprint}" + request_key = @fingerprint_calculator.call(idempotency_key_header_value, req) @backend.with_idempotency_key(request_key) do |store| if stored_response = store.lookup Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'store') return from_persisted_response(stored_response) end status, headers, body = @app.call(env) + expires_in_seconds = (headers.delete('X-Idempo-Persist-For-Seconds') || @persist_for_seconds).to_i if response_may_be_persisted?(status, headers, body) - expires_in_seconds = (headers.delete('X-Idempo-Persist-For-Seconds') || DEFAULT_TTL).to_i # Body is replaced with a cached version since a Rack response body is not rewindable marshaled_response, body = serialize_response(status, headers, body) store.store(data: marshaled_response, ttl: expires_in_seconds) end Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'freshly-generated') [status, headers, body] end rescue MalformedIdempotencyKey - res = { - ok: false, - error: { - message: "The Idempotency-Key header provided was empty" - } - } - [400, {'Content-Type' => 'application/json'}, [JSON.pretty_generate(res)]] + Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'malformed-idempotency-key') + @malformed_key_error_app.call(env) rescue ConcurrentRequest Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'conflict-concurrent-request') - res = { - ok: false, - error: { - message: "Another request with this idempotency key is still in progress, please try again later" - } - } - [429, {'Retry-After' => '2', 'Content-Type' => 'application/json'}, [JSON.pretty_generate(res)]] + @concurrent_request_error_app.call(env) end private def from_persisted_response(marshaled_response) @@ -92,11 +87,11 @@ rack_response_body.each { |chunk| body_chunks << chunk.dup } rack_response_body.close if rack_response_body.respond_to?(:close) # Only keep headers which are strings stringified_headers = headers.each_with_object({}) do |(header, value), filtered| - filtered[header] = value if value.is_a?(String) + filtered[header] = value if !header.start_with?('rack.') && value.is_a?(String) end message_packed_str = MessagePack.pack([status, stringified_headers, body_chunks]) deflated_message_packed_str = Zlib.deflate(message_packed_str) + ":1" Measurometer.increment_counter('idempo.response_total_generated_bytes', deflated_message_packed_str.bytesize) @@ -118,12 +113,11 @@ def body_size_within_limit?(response_headers, body) return response_headers['Content-Length'].to_i <= SAVED_RESPONSE_BODY_SIZE_LIMIT if response_headers['Content-Length'] return false unless body.is_a?(Array) # Arbitrary iterable of unknown size - precomputed_body_size = body.inject(0) { |sum, chunk| sum + chunk.bytesize } - precomputed_body_size <= SAVED_RESPONSE_BODY_SIZE_LIMIT + sum_of_string_bytesizes(body) <= SAVED_RESPONSE_BODY_SIZE_LIMIT end def status_may_be_persisted?(status) case status when 200..400 @@ -135,32 +129,23 @@ else false end end - def compute_request_fingerprint(req) - d = Digest::SHA256.new - d << req.url << "\n" - d << req.request_method << "\n" - d << req.get_header('HTTP_AUTHORIZATION').to_s << "\n" - while chunk = req.env['rack.input'].read(1024 * 65) - d << chunk - end - Base64.strict_encode64(d.digest) - ensure - req.env['rack.input'].rewind - end - def extract_idempotency_key_from(env) env['HTTP_IDEMPOTENCY_KEY'] || env['HTTP_X_IDEMPOTENCY_KEY'] end def request_verb_idempotent?(request) request.get? || request.head? || request.options? end + def sum_of_string_bytesizes(in_array) + in_array.inject(0) { |sum, chunk| sum + chunk.bytesize } + end + def unquote(str) - # Do not use regular expressions so that we don't have to thing about a catastrophic lookahead + # Do not use regular expressions so that we don't have to think about a catastrophic lookahead double_quote = '"' if str.start_with?(double_quote) && str.end_with?(double_quote) str[1..-2] else str