Sha256: 11c608c7359f3950eed37455b6f944b144729aa0896b5c0ef0616a3746c29f21

Contents?: true

Size: 1.52 KB

Versions: 1

Compression:

Stored size: 1.52 KB

Contents

module Embulk
  class InputPresto < InputPlugin
    require 'presto-client'

    Plugin.register_input("presto", self)

    def self.transaction(config, &control)
      task = {
        "host" => config.param("host", :string, default: "localhost"),
        "port" => config.param("port", :integer, default: 8080),
        "schema" => config.param("schema", :string, default: "default"),
        "catalog" => config.param("catalog", :string, default: "native"),
        "query" => config.param("query", :string),
        "user" => config.param("user", :string, default: "embulk"),
        "columns" => config.param("columns", :array, default: [])
      }

      columns = task['columns'].each_with_index.map do |c, i|
        Column.new(i, c["name"], c["type"].to_sym)
      end

      resume(task, columns, 1, &control)
    end

    def self.resume(task, columns, count, &control)
      task_reports = yield(task, columns, count)

      next_config_diff = {}
      return next_config_diff
    end

    def init
      @client = Presto::Client.new(
        server: "#{task['host']}:#{task['port']}",
        catalog: task['catalog'],
        user: task['user'],
        schema: task['schema']
      )
      @query = task["query"]

      Embulk.logger.info "SQL: #{@query}"
    end

    def run
      size = 0
      @client.query(@query) do |q|
        q.each_row {|row|
          page_builder.add(row)
        }
        size = q.rows.size
      end

      page_builder.finish

      task_report = { size: size }
      return task_report
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
embulk-input-presto-0.1.1 lib/embulk/input/presto.rb