lib/wcc/contentful/store/postgres_store.rb in wcc-contentful-1.2.1 vs lib/wcc/contentful/store/postgres_store.rb in wcc-contentful-1.3.0

- old
+ new

@@ -21,58 +21,76 @@ super() @schema_ensured = false connection_options ||= { dbname: 'postgres' } pool_options ||= {} @connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options) - @dirty = false + @dirty = Concurrent::AtomicBoolean.new + @mutex = Mutex.new end def set(key, value) ensure_hash value + result = - @connection_pool.with do |conn| - conn.exec_prepared('upsert_entry', [ - key, - value.to_json, - quote_array(extract_links(value)) - ]) + _instrument 'upsert_entry' do + @connection_pool.with do |conn| + conn.exec_prepared('upsert_entry', [ + key, + value.to_json, + quote_array(extract_links(value)) + ]) + end end previous_value = if result.num_tuples == 0 nil else val = result.getvalue(0, 0) JSON.parse(val) if val end - if views_need_update?(value, previous_value) && !mutex.with_read_lock { @dirty } - _instrument 'mark_dirty' - mutex.with_write_lock { @dirty = true } + if views_need_update?(value, previous_value) + # Mark the views as needing to be refreshed, they will be refreshed on the next query. + was_dirty = @dirty.make_true + # Send out an instrumentation event if we are the thread that marked it dirty + # (make_true returns true if the value changed) + _instrument 'mark_dirty' if was_dirty end previous_value end def keys - result = @connection_pool.with { |conn| conn.exec_prepared('select_ids') } + result = + _instrument 'select_ids' do + @connection_pool.with { |conn| conn.exec_prepared('select_ids') } + end + arr = [] result.each { |r| arr << r['id'].strip } arr rescue PG::ConnectionBad [] end def delete(key) - result = @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) } + result = + _instrument 'delete_by_id', key: key do + @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) } + end + return if result.num_tuples == 0 JSON.parse(result.getvalue(0, 1)) end def find(key, **_options) - result = @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) } + result = + _instrument 'select_entry', key: key do + @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) } + end return if result.num_tuples == 0 JSON.parse(result.getvalue(0, 1)) rescue PG::ConnectionBad nil @@ -85,25 +103,24 @@ options: options ) end def exec_query(statement, params = []) - if mutex.with_read_lock { @dirty } - was_dirty = - mutex.with_write_lock do - was_dirty = @dirty - @dirty = false - was_dirty + if @dirty.true? + # Only one thread should call refresh_views_concurrently but all should wait for it to finish. + @mutex.synchronize do + # We have to check again b/c another thread may have gotten here first + if @dirty.true? + _instrument 'refresh_views' do + @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') } + end + # Mark that the views have been refreshed. + @dirty.make_false end - - if was_dirty - _instrument 'refresh_views' do - @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') } - end end end - logger&.debug("[PostgresStore] #{statement}\n#{params.inspect}") + logger&.debug("[PostgresStore] #{statement} #{params.inspect}") _instrument 'exec' do @connection_pool.with { |conn| conn.exec(statement, params) } end end