Sha256: 873aafe70d0dc02db59056024d65aed79222a017cdde564bcbb498bf549f5142

Contents?: true

Size: 1.96 KB

Versions: 2

Compression:

Stored size: 1.96 KB

Contents

module AgnosticBackend
  module Elasticsearch
    class Indexer < AgnosticBackend::Indexer
      include AgnosticBackend::Utilities

      def publish(document)
        client.send_request(:put,
                            path: "/#{index.index_name}/#{index.type}/#{document["id"]}",
                            body: document)
      end

      def publish_all(documents)
        return if documents.empty?
        response = client.send_request(:post,
                                       path: "/#{index.index_name}/#{index.type}/_bulk",
                                       body: convert_to_bulk_upload_string(documents))
        body = ActiveSupport::JSON.decode(response.body) rescue {}
        # if at least one indexing attempt fails, raise the red flag
        raise IndexingError.new, "Error in bulk upload" if body["errors"]
        response
      end

      private

      def client
        index.client
      end

      def prepare(document)
        raise IndexingError.new, "Document does not have an ID field" unless document["id"].present?
        document
      end

      def transform(document)
        return {} if document.empty?

        document = flatten document
        document = reject_blank_values_from document
        document = format_dates_in document
        document
      end

      def format_dates_in(document)
        document.each do |k, v|
          if v.is_a?(Time)
            document[k] = v.utc.strftime("%Y-%m-%dT%H:%M:%SZ")
          elsif v.is_a?(Array) && v.all?{|e| e.is_a?(Time)}
            document[k] = v.map{|e| e.utc.strftime("%Y-%m-%dT%H:%M:%SZ")}
          end
        end
      end

      def convert_to_bulk_upload_string(documents)
        documents.map do |document|
          next if document.empty?
          header = { "index" => {"_id" => document["id"]}}.to_json
          document = ActiveSupport::JSON.encode transform(prepare(document))
          "#{header}\n#{document}\n"
        end.compact.join("\n")
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
agnostic_backend-1.0.4 lib/agnostic_backend/elasticsearch/indexer.rb
agnostic_backend-1.0.3 lib/agnostic_backend/elasticsearch/indexer.rb