Sha256: 0cb64828bb48e34eba98ba965457f4d7f59bdfa963bdfb2a88d9abbd5265df77

Contents?: true

Size: 1.93 KB

Versions: 1

Compression:

Stored size: 1.93 KB

Contents

require 'elasticsearch'

module Forklift
  module Connection
    class Elasticsearch < Forklift::Base::Connection

      def initialize(config, forklift)
        @config = config
        @forklift = forklift
        @client = ::Elasticsearch::Client.new(config)
      end

      def config
        @config
      end

      def forklift
        @forklift
      end

      def read(index, query, looping=true, from=0, size=1000)
        offset = 0
        loop_count = 0

        while (looping == true || loop_count == 0)
          data = []
          prepared_query = query
          prepared_query[:from] = from + offset
          prepared_query[:size] = size

          forklift.logger.debug "    ELASTICSEARCH: #{query}"
          results = client.search( { index: index, body: prepared_query } )
          results["hits"]["hits"].each do |hit|
            data << hit["_source"]
          end

          data.map{|l| l.symbolize_keys! }

          if block_given?
            yield data
          else
            return data
          end

          looping = false if results["hits"]["hits"].length == 0
          offset = offset + size
          loop_count = loop_count + 1
        end
      end

      def write(data, index, update=false, type='forklift', primary_key=:id)
        data.map{|l| l.symbolize_keys! }

        data.each do |d|
          object = {
            index:  index,
            body:   d,
            type:   type,
          }
          object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true )

          forklift.logger.debug "    ELASTICSEARCH (store): #{object}"
          client.index object
        end
        client.indices.refresh({ index: index })
      end

      def delete_index(index)
        forklift.logger.debug "    ELASTICSEARCH (delete index): #{index}"
        client.indices.delete({ index: index }) if client.indices.exists({ index: index })
      end

      private

      #/private

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
forklift_etl-1.1.6 lib/forklift/transports/elasticsearch.rb