CREATE OR REPLACE FUNCTION message_store.get_category_messages( category varchar, "position" bigint DEFAULT 1, batch_size bigint DEFAULT 1000, correlation varchar DEFAULT NULL, consumer_group_member bigint DEFAULT NULL, consumer_group_size bigint DEFAULT NULL, condition varchar DEFAULT NULL ) RETURNS SETOF message_store.message AS $$ DECLARE _command text; BEGIN IF NOT is_category(get_category_messages.category) THEN RAISE EXCEPTION 'Must be a category: %', get_category_messages.category; END IF; position := COALESCE(position, 1); batch_size := COALESCE(batch_size, 1000); _command := ' SELECT id::varchar, stream_name::varchar, type::varchar, position::bigint, global_position::bigint, data::varchar, metadata::varchar, time::timestamp FROM messages WHERE category(stream_name) = $1 AND global_position >= $2'; IF get_category_messages.correlation IS NOT NULL THEN IF position('-' IN get_category_messages.correlation) > 0 THEN RAISE EXCEPTION 'Correlation must be a category (Correlation: %)', get_category_messages.correlation; END IF; _command := _command || ' AND category(metadata->>''correlationStreamName'') = $4'; END IF; IF (get_category_messages.consumer_group_member IS NOT NULL AND get_category_messages.consumer_group_size IS NULL) OR (get_category_messages.consumer_group_member IS NULL AND get_category_messages.consumer_group_size IS NOT NULL) THEN RAISE EXCEPTION 'Consumer group member and size must be specified (Consumer Group Member: %, Consumer Group Size: %)', get_category_messages.consumer_group_member, get_category_messages.consumer_group_size; END IF; IF get_category_messages.consumer_group_member IS NOT NULL AND get_category_messages.consumer_group_size IS NOT NULL THEN IF get_category_messages.consumer_group_size < 1 THEN RAISE EXCEPTION 'Consumer group size must not be less than 1 (Consumer Group Member: %, Consumer Group Size: %)', get_category_messages.consumer_group_member, get_category_messages.consumer_group_size; END IF; IF get_category_messages.consumer_group_member < 0 THEN RAISE EXCEPTION 'Consumer group member must not be less than 0 (Consumer Group Member: %, Consumer Group Size: %)', get_category_messages.consumer_group_member, get_category_messages.consumer_group_size; END IF; IF get_category_messages.consumer_group_member >= get_category_messages.consumer_group_size THEN RAISE EXCEPTION 'Consumer group member must be less than the group size (Consumer Group Member: %, Consumer Group Size: %)', get_category_messages.consumer_group_member, get_category_messages.consumer_group_size; END IF; _command := _command || ' AND MOD(@hash_64(cardinal_id(stream_name)), $6) = $5'; END IF; IF get_category_messages.condition IS NOT NULL THEN IF current_setting('message_store.sql_condition', true) IS NULL OR current_setting('message_store.sql_condition', true) = 'off' THEN RAISE EXCEPTION 'Retrieval with SQL condition is not activated'; END IF; _command := _command || ' AND (%s)'; _command := format(_command, get_category_messages.condition); END IF; _command := _command || ' ORDER BY global_position ASC LIMIT $3'; IF current_setting('message_store.debug_get', true) = 'on' OR current_setting('message_store.debug', true) = 'on' THEN RAISE NOTICE 'ยป get_category_messages'; RAISE NOTICE 'category ($1): %', get_category_messages.category; RAISE NOTICE 'position ($2): %', get_category_messages.position; RAISE NOTICE 'batch_size ($3): %', get_category_messages.batch_size; RAISE NOTICE 'correlation ($4): %', get_category_messages.correlation; RAISE NOTICE 'consumer_group_member ($5): %', get_category_messages.consumer_group_member; RAISE NOTICE 'consumer_group_size ($6): %', get_category_messages.consumer_group_size; RAISE NOTICE 'condition: %', get_category_messages.condition; RAISE NOTICE 'Generated Command: %', _command; END IF; RETURN QUERY EXECUTE _command USING get_category_messages.category, get_category_messages.position, get_category_messages.batch_size, get_category_messages.correlation, get_category_messages.consumer_group_member, get_category_messages.consumer_group_size::smallint; END; $$ LANGUAGE plpgsql VOLATILE;