Sha256: dc176e2984a389857d89b1a14e5f569c90358f149b18a760d0e035ac03cdad1b

Contents?: true

Size: 1.97 KB

Versions: 4

Compression:

Stored size: 1.97 KB

Contents

require 'elasticsearch'

module Forklift
  module Connection
    class Elasticsearch < Forklift::Base::Connection

      def initialize(config, forklift)
        @config = config
        @forklift = forklift
        @client = ::Elasticsearch::Client.new(config)
      end

      def config
        @config
      end

      def forklift
        @forklift
      end

      def read(index, query, looping=true, from=0, size=forklift.config[:batch_size])
        offset = 0
        loop_count = 0

        while (looping == true || loop_count == 0)
          data = []
          prepared_query = query
          prepared_query[:from] = from + offset
          prepared_query[:size] = size

          forklift.logger.debug "    ELASTICSEARCH: #{query.to_json}"
          results = client.search( { index: index, body: prepared_query } )
          results["hits"]["hits"].each do |hit|
            data << hit["_source"]
          end

          data.map{|l| l.symbolize_keys! }

          if block_given?
            yield data
          else
            return data
          end

          looping = false if results["hits"]["hits"].length == 0
          offset = offset + size
          loop_count = loop_count + 1
        end
      end

      def write(data, index, update=false, type='forklift', primary_key=:id)
        data.map{|l| l.symbolize_keys! }

        data.each do |d|
          object = {
            index:  index,
            body:   d,
            type:   type,
          }
          object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true )

          forklift.logger.debug "    ELASTICSEARCH (store): #{object.to_json}"
          client.index object
        end
        client.indices.refresh({ index: index })
      end

      def delete_index(index)
        forklift.logger.debug "    ELASTICSEARCH (delete index): #{index}"
        client.indices.delete({ index: index }) if client.indices.exists({ index: index })
      end

      private

      #/private

    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
forklift_etl-1.1.12 lib/forklift/transports/elasticsearch.rb
forklift_etl-1.1.11 lib/forklift/transports/elasticsearch.rb
forklift_etl-1.1.10 lib/forklift/transports/elasticsearch.rb
forklift_etl-1.1.9 lib/forklift/transports/elasticsearch.rb