lib/wcc/contentful/store/postgres_store.rb in wcc-contentful-1.2.1 vs lib/wcc/contentful/store/postgres_store.rb in wcc-contentful-1.3.0
- old
+ new
@@ -21,58 +21,76 @@
super()
@schema_ensured = false
connection_options ||= { dbname: 'postgres' }
pool_options ||= {}
@connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options)
- @dirty = false
+ @dirty = Concurrent::AtomicBoolean.new
+ @mutex = Mutex.new
end
def set(key, value)
ensure_hash value
+
result =
- @connection_pool.with do |conn|
- conn.exec_prepared('upsert_entry', [
- key,
- value.to_json,
- quote_array(extract_links(value))
- ])
+ _instrument 'upsert_entry' do
+ @connection_pool.with do |conn|
+ conn.exec_prepared('upsert_entry', [
+ key,
+ value.to_json,
+ quote_array(extract_links(value))
+ ])
+ end
end
previous_value =
if result.num_tuples == 0
nil
else
val = result.getvalue(0, 0)
JSON.parse(val) if val
end
- if views_need_update?(value, previous_value) && !mutex.with_read_lock { @dirty }
- _instrument 'mark_dirty'
- mutex.with_write_lock { @dirty = true }
+ if views_need_update?(value, previous_value)
+ # Mark the views as needing to be refreshed, they will be refreshed on the next query.
+ was_dirty = @dirty.make_true
+ # Send out an instrumentation event if we are the thread that marked it dirty
+ # (make_true returns true if the value changed)
+ _instrument 'mark_dirty' if was_dirty
end
previous_value
end
def keys
- result = @connection_pool.with { |conn| conn.exec_prepared('select_ids') }
+ result =
+ _instrument 'select_ids' do
+ @connection_pool.with { |conn| conn.exec_prepared('select_ids') }
+ end
+
arr = []
result.each { |r| arr << r['id'].strip }
arr
rescue PG::ConnectionBad
[]
end
def delete(key)
- result = @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) }
+ result =
+ _instrument 'delete_by_id', key: key do
+ @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) }
+ end
+
return if result.num_tuples == 0
JSON.parse(result.getvalue(0, 1))
end
def find(key, **_options)
- result = @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) }
+ result =
+ _instrument 'select_entry', key: key do
+ @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) }
+ end
return if result.num_tuples == 0
JSON.parse(result.getvalue(0, 1))
rescue PG::ConnectionBad
nil
@@ -85,25 +103,24 @@
options: options
)
end
def exec_query(statement, params = [])
- if mutex.with_read_lock { @dirty }
- was_dirty =
- mutex.with_write_lock do
- was_dirty = @dirty
- @dirty = false
- was_dirty
+ if @dirty.true?
+ # Only one thread should call refresh_views_concurrently but all should wait for it to finish.
+ @mutex.synchronize do
+ # We have to check again b/c another thread may have gotten here first
+ if @dirty.true?
+ _instrument 'refresh_views' do
+ @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') }
+ end
+ # Mark that the views have been refreshed.
+ @dirty.make_false
end
-
- if was_dirty
- _instrument 'refresh_views' do
- @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') }
- end
end
end
- logger&.debug("[PostgresStore] #{statement}\n#{params.inspect}")
+ logger&.debug("[PostgresStore] #{statement} #{params.inspect}")
_instrument 'exec' do
@connection_pool.with { |conn| conn.exec(statement, params) }
end
end