Sha256: fe3f36ccb4362a51e3d95c23f3fa9da036dea0a4d11a5b463279822188c0810c

Contents?: true

Size: 1.9 KB

Versions: 9

Compression:

Stored size: 1.9 KB

Contents

# frozen_string_literal: true

require 'pub_sub_model_sync/payload'
module PubSubModelSync
  class ServiceBase < PubSubModelSync::Base
    SERVICE_KEY = 'service_model_sync'

    def listen_messages
      raise NoMethodError, 'method :listen_messages must be defined in service'
    end

    # @param _payload (Payload)
    def publish(_payload)
      raise NoMethodError, 'method :publish must be defined in service'
    end

    def stop
      raise NoMethodError, 'method :stop must be defined in service'
    end

    private

    # @param payload (Payload)
    # @return (String): Json Format
    def encode_payload(payload)
      data = payload.to_h
      not_important_keys = %i[forced_ordering_key cache]
      reduce_payload_size = !config.debug
      data[:headers].except!(*not_important_keys) if reduce_payload_size
      data.to_json
    end

    # @param (String: Payload in json format)
    def process_message(payload_info)
      payload = decode_payload(payload_info)
      return unless payload
      return if same_app_message?(payload) || !target_app_message?(payload)

      payload.process
    end

    # @return [Payload,Nil]
    def decode_payload(payload_info)
      payload = ::PubSubModelSync::Payload.from_payload_data(JSON.parse(payload_info))
      log("Received message: #{[payload]}") if config.debug
      payload
    rescue => e
      error_payload = [payload_info, e.message, e.backtrace]
      log("Error while parsing payload: #{error_payload}", :error)
      nil
    end

    # @param payload (Payload)
    def same_app_message?(payload)
      key = payload.headers[:app_key]
      res = key && key == config.subscription_key
      log("Skipping message from same origin: #{[payload]}") if res && config.debug
      res
    end

    def target_app_message?(payload)
      key = payload.headers[:target_app_key].to_s
      !key.present? || key.split(',').include?(config.subscription_key)
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
pub_sub_model_sync-1.7.2 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.7.1 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.7.0 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.6.4 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.6.3 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.6.2 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.6.1 lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.6.1pre lib/pub_sub_model_sync/service_base.rb
pub_sub_model_sync-1.6.0 lib/pub_sub_model_sync/service_base.rb