Sha256: 0cb64828bb48e34eba98ba965457f4d7f59bdfa963bdfb2a88d9abbd5265df77
Contents?: true
Size: 1.93 KB
Versions: 1
Compression:
Stored size: 1.93 KB
Contents
require 'elasticsearch' module Forklift module Connection class Elasticsearch < Forklift::Base::Connection def initialize(config, forklift) @config = config @forklift = forklift @client = ::Elasticsearch::Client.new(config) end def config @config end def forklift @forklift end def read(index, query, looping=true, from=0, size=1000) offset = 0 loop_count = 0 while (looping == true || loop_count == 0) data = [] prepared_query = query prepared_query[:from] = from + offset prepared_query[:size] = size forklift.logger.debug " ELASTICSEARCH: #{query}" results = client.search( { index: index, body: prepared_query } ) results["hits"]["hits"].each do |hit| data << hit["_source"] end data.map{|l| l.symbolize_keys! } if block_given? yield data else return data end looping = false if results["hits"]["hits"].length == 0 offset = offset + size loop_count = loop_count + 1 end end def write(data, index, update=false, type='forklift', primary_key=:id) data.map{|l| l.symbolize_keys! } data.each do |d| object = { index: index, body: d, type: type, } object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true ) forklift.logger.debug " ELASTICSEARCH (store): #{object}" client.index object end client.indices.refresh({ index: index }) end def delete_index(index) forklift.logger.debug " ELASTICSEARCH (delete index): #{index}" client.indices.delete({ index: index }) if client.indices.exists({ index: index }) end private #/private end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
forklift_etl-1.1.6 | lib/forklift/transports/elasticsearch.rb |