Sha256: 878328feb78128b826d49579b86c658b7e6f2a85b987ae9c87d21a0b03cb705d
Contents?: true
Size: 1.61 KB
Versions: 4
Compression:
Stored size: 1.61 KB
Contents
require 'google/cloud/bigquery' require 'arel/visitors/bigquery' require 'activerecord_bigquery_adapter/information_schema' module ActiveRecordBigqueryAdapter class Connection attr_reader :dataset, :config class << self def datasets(config) config = config.symbolize_keys @datasets ||= {} @mutex ||= Mutex.new @mutex.synchronize do @datasets[database_path(config)] ||= Google::Cloud::Bigquery.new(project_id: config[:project]).dataset(config[:dataset]) end end def async_inserter(config, table_name) @async_inserter ||= {} @async_inserter["#{database_path(config)}/#{table_name}"] ||= datasets(config).table(table_name).insert_async do |result| if result.error? Google::Apis.logger.error(result.error) else Google::Apis.logger.info("inserted #{result.insert_count} rows with #{result.error_count} errors") end end end def information_schema(config) @information_schemas ||= {} @information_schemas[database_path(config)] ||= ActiveRecordBigqueryAdapter::InformationSchema.new(datasets(config)) end def database_path(config) "#{config[:project]}/#{config[:dataset]}" end end def initialize(config) @dataset = self.class.datasets(config) @config = config end def insert_async(table_name, values) self.class.async_inserter(config, table_name).insert(Array.wrap(values)) end def query(sql, params: nil, types: nil) dataset.query(sql, params: params, types: types) end end end
Version data entries
4 entries across 4 versions & 1 rubygems