lib/wcc/contentful/store/postgres_store.rb in wcc-contentful-0.4.0.pre.rc vs lib/wcc/contentful/store/postgres_store.rb in wcc-contentful-1.0.0.pre.rc1
- old
+ new
@@ -2,37 +2,69 @@
gem 'pg', '~> 1.0'
gem 'connection_pool', '~> 2.2'
require 'pg'
require 'connection_pool'
+require_relative 'instrumentation'
module WCC::Contentful::Store
+ # Implements the store interface where all Contentful entries are stored in a
+ # JSONB table.
class PostgresStore < Base
+ include WCC::Contentful::Instrumentation
+
+ delegate :each, to: :to_enum
+
attr_reader :connection_pool
+ attr_accessor :logger
def initialize(_config = nil, connection_options = nil, pool_options = nil)
super()
@schema_ensured = false
connection_options ||= { dbname: 'postgres' }
pool_options ||= {}
- @connection_pool = build_connection_pool(connection_options, pool_options)
+ @connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options)
+ @dirty = false
end
def set(key, value)
ensure_hash value
- result = @connection_pool.with { |conn| conn.exec_prepared('upsert_entry', [key, value.to_json]) }
- return if result.num_tuples == 0
+ result =
+ @connection_pool.with do |conn|
+ conn.exec_prepared('upsert_entry', [
+ key,
+ value.to_json,
+ quote_array(extract_links(value))
+ ])
+ end
- val = result.getvalue(0, 0)
- JSON.parse(val) if val
+ previous_value =
+ if result.num_tuples == 0
+ nil
+ else
+ val = result.getvalue(0, 0)
+ JSON.parse(val) if val
+ end
+
+ if views_need_update?(value, previous_value)
+ # mark dirty - we need to refresh the materialized view
+ unless mutex.with_read_lock { @dirty }
+ _instrument 'mark_dirty'
+ mutex.with_write_lock { @dirty = true }
+ end
+ end
+
+ previous_value
end
def keys
result = @connection_pool.with { |conn| conn.exec_prepared('select_ids') }
arr = []
result.each { |r| arr << r['id'].strip }
arr
+ rescue PG::ConnectionBad
+ []
end
def delete(key)
result = @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) }
return if result.num_tuples == 0
@@ -43,156 +75,270 @@
def find(key, **_options)
result = @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) }
return if result.num_tuples == 0
JSON.parse(result.getvalue(0, 1))
+ rescue PG::ConnectionBad
+ nil
end
def find_all(content_type:, options: nil)
- statement = "WHERE data->'sys'->'contentType'->'sys'->>'id' = $1"
Query.new(
self,
- @connection_pool,
- statement,
- [content_type],
- options
+ content_type: content_type,
+ options: options
)
end
- class Query < Base::Query
- def initialize(store, connection_pool, statement = nil, params = nil, options = nil)
- super(store)
- @connection_pool = connection_pool
- @statement = statement ||
- "WHERE data->'sys'->>'id' IS NOT NULL"
- @params = params || []
- @options = options || {}
+ def exec_query(statement, params = [])
+ if mutex.with_read_lock { @dirty }
+ was_dirty =
+ mutex.with_write_lock do
+ was_dirty = @dirty
+ @dirty = false
+ was_dirty
+ end
+
+ if was_dirty
+ _instrument 'refresh_views' do
+ @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') }
+ end
+ end
end
- def eq(field, expected, context = nil)
- locale = context[:locale] if context.present?
- locale ||= 'en-US'
+ logger&.debug('[PostgresStore] ' + statement + "\n" + params.inspect)
+ _instrument 'exec' do
+ @connection_pool.with { |conn| conn.exec(statement, params) }
+ end
+ end
- params = @params.dup
+ private
- statement = @statement + " AND data->'fields'->$#{push_param(field, params)}" \
- "->$#{push_param(locale, params)} ? $#{push_param(expected, params)}"
+ def extract_links(entry)
+ return [] unless fields = entry && entry['fields']
- Query.new(
- @store,
- @connection_pool,
- statement,
- params,
- @options
- )
- end
+ links =
+ fields.flat_map do |_f, locale_hash|
+ locale_hash&.flat_map do |_locale, value|
+ if value.is_a? Array
+ value.map { |val| val.dig('sys', 'id') if link?(val) }
+ elsif link?(value)
+ value.dig('sys', 'id')
+ end
+ end
+ end
+ links.compact
+ end
+
+ def link?(value)
+ value.is_a?(Hash) && value.dig('sys', 'type') == 'Link'
+ end
+
+ def quote_array(arr)
+ return unless arr
+
+ encoder = PG::TextEncoder::Array.new
+ encoder.encode(arr)
+ end
+
+ def views_need_update?(value, previous_value)
+ # contentful_raw_includes_ids_jointable needs update if any links change
+ return true if extract_links(value) != extract_links(previous_value)
+ end
+
+ class Query < WCC::Contentful::Store::Query
def count
return @count if @count
- statement = 'SELECT count(*) FROM contentful_raw ' + @statement
- result = @connection_pool.with { |conn| conn.exec(statement, @params) }
+ statement, params = finalize_statement('SELECT count(*)')
+ result = store.exec_query(statement, params)
@count = result.getvalue(0, 0).to_i
end
def first
return @first if @first
- statement = 'SELECT * FROM contentful_raw ' + @statement + ' LIMIT 1'
- result = @connection_pool.with { |conn| conn.exec(statement, @params) }
+ statement, params = finalize_statement('SELECT t.*', ' LIMIT 1', depth: @options[:include])
+ result = store.exec_query(statement, params)
return if result.num_tuples == 0
- resolve_includes(
- JSON.parse(result.getvalue(0, 1)),
- @options[:include]
- )
- end
+ row = result.first
+ entry = JSON.parse(row['data'])
- def map
- arr = []
- resolve.each do |row|
- arr << yield(
- resolve_includes(
- JSON.parse(row['data']),
- @options[:include]
- )
- )
+ if @options[:include] && @options[:include] > 0
+ includes = decode_includes(row['includes'])
+ entry = resolve_includes([entry, includes], @options[:include])
end
- arr
+ entry
end
- def result
- arr = []
- resolve.each do |row|
- arr <<
- resolve_includes(
- JSON.parse(row['data']),
- @options[:include]
- )
- end
- arr
- end
+ def result_set
+ return @result_set if @result_set
- # TODO: override resolve_includes to make it more efficient
+ statement, params = finalize_statement('SELECT t.*', depth: @options[:include])
+ @result_set =
+ store.exec_query(statement, params)
+ .lazy.map do |row|
+ entry = JSON.parse(row['data'])
+ includes =
+ (decode_includes(row['includes']) if @options[:include] && @options[:include] > 0)
+ [entry, includes]
+ end
+ rescue PG::ConnectionBad
+ []
+ end
+
private
- def resolve
- return @resolved if @resolved
+ def decode_includes(includes)
+ return {} unless includes
- statement = 'SELECT * FROM contentful_raw ' + @statement
- @resolved = @connection_pool.with { |conn| conn.exec(statement, @params) }
+ decoder = PG::TextDecoder::Array.new
+ decoder.decode(includes)
+ .map { |e| JSON.parse(e) }
+ .each_with_object({}) do |entry, h|
+ h[entry.dig('sys', 'id')] = entry
+ end
end
+ def finalize_statement(select_statement, limit_statement = nil, depth: nil)
+ statement =
+ if content_type == 'Asset'
+ "WHERE t.data->'sys'->>'type' = $1"
+ else
+ "WHERE t.data->'sys'->'contentType'->'sys'->>'id' = $1"
+ end
+ params = [content_type]
+ joins = []
+
+ statement =
+ conditions.reduce(statement) do |memo, condition|
+ raise ArgumentError, "Operator #{condition.op} not supported" unless condition.op == :eq
+
+ if condition.path_tuples.length == 1
+ memo + _eq(condition.path, condition.expected, params)
+ else
+ join_path, expectation_path = condition.path_tuples
+ memo + _join(join_path, expectation_path, condition.expected, params, joins)
+ end
+ end
+
+ table = 'contentful_raw'
+ if depth && depth > 0
+ table = 'contentful_raw_includes'
+ select_statement += ', t.includes'
+ end
+
+ statement =
+ select_statement +
+ " FROM #{table} AS t \n" +
+ joins.join("\n") + "\n" +
+ statement +
+ (limit_statement || '')
+
+ [statement, params]
+ end
+
+ def _eq(path, expected, params)
+ return " AND t.id = $#{push_param(expected, params)}" if path == %w[sys id]
+
+ if path[3] == 'sys'
+ # the path can be either an array or a singular json obj, and we have to dig
+ # into it to detect whether it contains `{ "sys": { "id" => expected } }`
+ expected = { 'sys' => { path[4] => expected } }.to_json
+ return ' AND fn_contentful_jsonb_any_to_jsonb_array(t.data->' \
+ "#{quote_parameter_path(path.take(3))}) @> " \
+ "jsonb_build_array($#{push_param(expected, params)}::jsonb)"
+ end
+
+ " AND t.data->#{quote_parameter_path(path)}" \
+ " ? $#{push_param(expected, params)}::text"
+ end
+
def push_param(param, params)
params << param
params.length
end
- end
- def self.ensure_schema(conn)
- conn.exec(<<~HEREDOC
- CREATE TABLE IF NOT EXISTS contentful_raw (
- id varchar PRIMARY KEY,
- data jsonb
- );
- CREATE INDEX IF NOT EXISTS contentful_raw_value_type ON contentful_raw ((data->'sys'->>'type'));
- CREATE INDEX IF NOT EXISTS contentful_raw_value_content_type ON contentful_raw ((data->'sys'->'contentType'->'sys'->>'id'));
+ def quote_parameter_path(path)
+ path.map { |p| "'#{p}'" }.join('->')
+ end
- DROP FUNCTION IF EXISTS "upsert_entry"(_id varchar, _data jsonb);
- CREATE FUNCTION "upsert_entry"(_id varchar, _data jsonb) RETURNS jsonb AS $$
- DECLARE
- prev jsonb;
- BEGIN
- SELECT data FROM contentful_raw WHERE id = _id INTO prev;
- INSERT INTO contentful_raw (id, data) values (_id, _data)
- ON CONFLICT (id) DO
- UPDATE
- SET data = _data;
- RETURN prev;
- END;
- $$ LANGUAGE 'plpgsql';
- HEREDOC
- )
- end
+ def _join(join_path, expectation_path, expected, params, joins)
+ # join back to the table using the links column (join_table_alias becomes s0, s1, s2)
+ # this is faster because of the index
+ join_table_alias = push_join(join_path, joins)
- def self.prepare_statements(conn)
- conn.prepare('upsert_entry', 'SELECT * FROM upsert_entry($1,$2)')
- conn.prepare('select_entry', 'SELECT * FROM contentful_raw WHERE id = $1')
- conn.prepare('select_ids', 'SELECT id FROM contentful_raw')
- conn.prepare('delete_by_id', 'DELETE FROM contentful_raw WHERE id = $1 RETURNING *')
+ # then apply the where clauses:
+ # 1. that the joined entry has the data at the appropriate path
+ # 2. that the entry joining to the other entry actually links at this path and not another
+ <<~WHERE_CLAUSE
+ AND #{join_table_alias}.data->#{quote_parameter_path(expectation_path)} ? $#{push_param(expected, params)}::text
+ AND exists (select 1 from jsonb_array_elements(fn_contentful_jsonb_any_to_jsonb_array(t.data->#{quote_parameter_path(join_path)})) as link where link->'sys'->'id' ? #{join_table_alias}.id)
+ WHERE_CLAUSE
+ end
+
+ def push_join(_path, joins)
+ table_alias = "s#{joins.length}"
+ joins << "JOIN contentful_raw AS #{table_alias} ON " \
+ "#{table_alias}.id=ANY(t.links)"
+ table_alias
+ end
end
- private
+ EXPECTED_VERSION = 2
- def build_connection_pool(connection_options, pool_options)
- ConnectionPool.new(pool_options) do
- PG.connect(connection_options).tap do |conn|
- unless @schema_ensured
- PostgresStore.ensure_schema(conn)
- @schema_ensured = true
+ class << self
+ def prepare_statements(conn)
+ conn.prepare('upsert_entry', 'SELECT * FROM fn_contentful_upsert_entry($1,$2,$3)')
+ conn.prepare('select_entry', 'SELECT * FROM contentful_raw WHERE id = $1')
+ conn.prepare('select_ids', 'SELECT id FROM contentful_raw')
+ conn.prepare('delete_by_id', 'DELETE FROM contentful_raw WHERE id = $1 RETURNING *')
+ conn.prepare('refresh_views_concurrently',
+ 'REFRESH MATERIALIZED VIEW CONCURRENTLY contentful_raw_includes_ids_jointable')
+ end
+
+ # This is intentionally a class var so that all subclasses share the same mutex
+ @@schema_mutex = Mutex.new # rubocop:disable Style/ClassVars
+
+ def build_connection_pool(connection_options, pool_options)
+ ConnectionPool.new(pool_options) do
+ PG.connect(connection_options).tap do |conn|
+ unless schema_ensured?(conn)
+ @@schema_mutex.synchronize do
+ ensure_schema(conn) unless schema_ensured?(conn)
+ end
+ end
+ prepare_statements(conn)
end
- PostgresStore.prepare_statements(conn)
+ end
+ end
+
+ def schema_ensured?(conn)
+ result = conn.exec('SELECT version FROM wcc_contentful_schema_version' \
+ ' ORDER BY version DESC LIMIT 1')
+ return false if result.num_tuples == 0
+
+ result[0]['version'].to_i >= EXPECTED_VERSION
+ rescue PG::UndefinedTable
+ # need to run v1 schema migration
+ false
+ end
+
+ def ensure_schema(conn)
+ result =
+ begin
+ conn.exec('SELECT version FROM wcc_contentful_schema_version' \
+ ' ORDER BY version DESC')
+ rescue PG::UndefinedTable
+ []
+ end
+ 1.upto(EXPECTED_VERSION).each do |version_num|
+ next if result.find { |row| row['version'].to_s == version_num.to_s }
+
+ conn.exec(File.read(File.join(__dir__, "postgres_store/schema_#{version_num}.sql")))
end
end
end
end
end