lib/idempo.rb in idempo-0.1.3 vs lib/idempo.rb in idempo-0.2.0
- old
+ new
@@ -6,76 +6,71 @@
require 'digest'
require 'json'
require 'measurometer'
require_relative "idempo/version"
+require_relative "idempo/request_fingerprint"
require_relative "idempo/memory_backend"
require_relative "idempo/redis_backend"
require_relative "idempo/active_record_backend"
+require_relative "idempo/malformed_key_error_app"
+require_relative "idempo/concurrent_request_error_app"
class Idempo
- DEFAULT_TTL = 30
+ DEFAULT_TTL_SECONDS = 30
SAVED_RESPONSE_BODY_SIZE_LIMIT = 4 * 1024 * 1024
class Error < StandardError; end
class ConcurrentRequest < Error; end
class MalformedIdempotencyKey < Error; end
- def initialize(app, backend: MemoryBackend.new)
+ def initialize(app, backend: MemoryBackend.new, malformed_key_error_app: MalformedKeyErrorApp, compute_fingerprint_via: RequestFingerprint, concurrent_request_error_app: ConcurrentRequestErrorApp, persist_for_seconds: DEFAULT_TTL_SECONDS)
@backend = backend
@app = app
+ @concurrent_request_error_app = concurrent_request_error_app
+ @malformed_key_error_app = malformed_key_error_app
+ @fingerprint_calculator = compute_fingerprint_via
+ @persist_for_seconds = persist_for_seconds.to_i
end
def call(env)
req = Rack::Request.new(env)
return @app.call(env) if request_verb_idempotent?(req)
return @app.call(env) unless idempotency_key_header = extract_idempotency_key_from(env)
# The RFC requires that the Idempotency-Key header value is enclosed in quotes
- idempotency_key_header = unquote(idempotency_key_header)
- raise MalformedIdempotencyKey if idempotency_key_header == ''
+ idempotency_key_header_value = unquote(idempotency_key_header)
+ raise MalformedIdempotencyKey if idempotency_key_header_value == ''
- fingerprint = compute_request_fingerprint(req)
- request_key = "#{idempotency_key_header}_#{fingerprint}"
+ request_key = @fingerprint_calculator.call(idempotency_key_header_value, req)
@backend.with_idempotency_key(request_key) do |store|
if stored_response = store.lookup
Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'store')
return from_persisted_response(stored_response)
end
status, headers, body = @app.call(env)
+ expires_in_seconds = (headers.delete('X-Idempo-Persist-For-Seconds') || @persist_for_seconds).to_i
if response_may_be_persisted?(status, headers, body)
- expires_in_seconds = (headers.delete('X-Idempo-Persist-For-Seconds') || DEFAULT_TTL).to_i
# Body is replaced with a cached version since a Rack response body is not rewindable
marshaled_response, body = serialize_response(status, headers, body)
store.store(data: marshaled_response, ttl: expires_in_seconds)
end
Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'freshly-generated')
[status, headers, body]
end
rescue MalformedIdempotencyKey
- res = {
- ok: false,
- error: {
- message: "The Idempotency-Key header provided was empty"
- }
- }
- [400, {'Content-Type' => 'application/json'}, [JSON.pretty_generate(res)]]
+ Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'malformed-idempotency-key')
+ @malformed_key_error_app.call(env)
rescue ConcurrentRequest
Measurometer.increment_counter('idempo.responses_served_from', 1, from: 'conflict-concurrent-request')
- res = {
- ok: false,
- error: {
- message: "Another request with this idempotency key is still in progress, please try again later"
- }
- }
- [429, {'Retry-After' => '2', 'Content-Type' => 'application/json'}, [JSON.pretty_generate(res)]]
+ @concurrent_request_error_app.call(env)
end
private
def from_persisted_response(marshaled_response)
@@ -92,11 +87,11 @@
rack_response_body.each { |chunk| body_chunks << chunk.dup }
rack_response_body.close if rack_response_body.respond_to?(:close)
# Only keep headers which are strings
stringified_headers = headers.each_with_object({}) do |(header, value), filtered|
- filtered[header] = value if value.is_a?(String)
+ filtered[header] = value if !header.start_with?('rack.') && value.is_a?(String)
end
message_packed_str = MessagePack.pack([status, stringified_headers, body_chunks])
deflated_message_packed_str = Zlib.deflate(message_packed_str) + ":1"
Measurometer.increment_counter('idempo.response_total_generated_bytes', deflated_message_packed_str.bytesize)
@@ -118,12 +113,11 @@
def body_size_within_limit?(response_headers, body)
return response_headers['Content-Length'].to_i <= SAVED_RESPONSE_BODY_SIZE_LIMIT if response_headers['Content-Length']
return false unless body.is_a?(Array) # Arbitrary iterable of unknown size
- precomputed_body_size = body.inject(0) { |sum, chunk| sum + chunk.bytesize }
- precomputed_body_size <= SAVED_RESPONSE_BODY_SIZE_LIMIT
+ sum_of_string_bytesizes(body) <= SAVED_RESPONSE_BODY_SIZE_LIMIT
end
def status_may_be_persisted?(status)
case status
when 200..400
@@ -135,32 +129,23 @@
else
false
end
end
- def compute_request_fingerprint(req)
- d = Digest::SHA256.new
- d << req.url << "\n"
- d << req.request_method << "\n"
- d << req.get_header('HTTP_AUTHORIZATION').to_s << "\n"
- while chunk = req.env['rack.input'].read(1024 * 65)
- d << chunk
- end
- Base64.strict_encode64(d.digest)
- ensure
- req.env['rack.input'].rewind
- end
-
def extract_idempotency_key_from(env)
env['HTTP_IDEMPOTENCY_KEY'] || env['HTTP_X_IDEMPOTENCY_KEY']
end
def request_verb_idempotent?(request)
request.get? || request.head? || request.options?
end
+ def sum_of_string_bytesizes(in_array)
+ in_array.inject(0) { |sum, chunk| sum + chunk.bytesize }
+ end
+
def unquote(str)
- # Do not use regular expressions so that we don't have to thing about a catastrophic lookahead
+ # Do not use regular expressions so that we don't have to think about a catastrophic lookahead
double_quote = '"'
if str.start_with?(double_quote) && str.end_with?(double_quote)
str[1..-2]
else
str