lib/jetel/loaders/pg/pg.rb in jetel-0.0.6 vs lib/jetel/loaders/pg/pg.rb in jetel-0.0.7

- old
+ new

@@ -1,8 +1,12 @@ require_relative '../loader' +require_relative '../../helpers/helpers' + require 'pg' +require 'csv2psql/convert/convert' +require 'csv2psql/analyzer/analyzer' module Jetel module Loaders class Pg < Loader def initialize(uri) @@ -28,9 +32,86 @@ :user => user, :password => password } @client = PG.connect(opts) + end + + def load(modul, source, file, opts) + super + + convert_opts = { + :l => 1_000, + :skip => 0, + :header => true + } + + schema_list = Csv2Psql::Convert.generate_schema([file], convert_opts) + _file_name, schema = schema_list.first + + return nil if schema.nil? + + analyzer = Csv2Psql::Analyzer.new + column_types = (opts['column_type'] && opts['column_type'].split(/[;,]/)) || [] + column_types.each do |ct| + name, type = ct.split('=') + + columns = schema[:columns] || [] + column = columns.find do |k, v| + k.downcase == name + end + + analyzer_type = analyzer.analyzers.find do |spec| + spec[:class].name.split('::').last.downcase == type.downcase + end + + type_val = analyzer_type ? analyzer_type[:class].const_get(:TYPE) : type + + if column + columns[column[0]] = { + type: type_val.to_sym, + null: true + } + end + end + + ctx = { + :ctx => { + :table => Helper.sanitize(source[:name]).downcase, + :columns => schema[:columns], + :source => source, + :module => modul, + :file => File.absolute_path(modul.transformed_file(source, opts)) + } + } + + sql = Helper.erb_template(File.expand_path('../sql/schema.sql.erb', __FILE__), ctx) + sql.gsub!("\n\n", "\n") + puts sql + @client.exec(sql) + + sql = Helper.erb_template(File.expand_path('../sql/copy.sql.erb', __FILE__), ctx) + sql.gsub!("\n\n", "\n") + puts sql + @client.exec(sql) + + file = File.open(ctx[:ctx][:file], 'r') + while !file.eof? + # Add row to copy data + @client.put_copy_data(file.readline) + end + + # We are done adding copy data + @client.put_copy_end + + # Display any error messages + while res = @client.get_result + if e_message = res.error_message + p e_message + end + end + + sql end end end end