Class: Longleaf::SequelIndexDriver
- Inherits:
-
Object
- Object
- Longleaf::SequelIndexDriver
- Includes:
- Logging
- Defined in:
- lib/longleaf/indexing/sequel_index_driver.rb
Overview
Driver for interacting with RDBM based metadata index using the Sequel ORM gem. Users must create the database and credentials for connecting to it in advance, if using a database application that requires creation of databases (ie, not sqlite). The default database name is 'longleaf_metadata_index' but may be overridden.
See the Sequel documentation for details about accepted connection parameters: github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc
Instance Method Summary collapse
- #add_path_restrictions(dataset, file_selector) ⇒ Object private
-
#clear_index(older_than = nil) ⇒ Object
Remove all entries from the index.
- #convert_iso8601_to_timestamp(iso8601) ⇒ Object private
- #db_conn ⇒ Object private
-
#delay_until_timestamp(md_rec) ⇒ Object
The first failure timestamp for any service, or nil if there were none.
-
#each_registered_path(file_selector, older_than: nil, &block) ⇒ Object
Calls the provided block once per each registered file path registered.
-
#first_service_execution_timestamp(expected_services, md_rec) ⇒ Object
Find the earliest service execution time for any services expected to be run for the specified file.
-
#index(file_rec) ⇒ Object
Index the provided file_rec and its metadata.
-
#initialize(app_config, adapter, conn_details, page_size: nil) ⇒ SequelIndexDriver
constructor
Initialize the index driver.
-
#is_stale? ⇒ Boolean
Returns true if the application configuration does not match the configuration used for the last reindex.
- #minimum_timestamp ⇒ Object private
-
#paths_with_stale_services(file_selector, stale_datetime) ⇒ Array
Retrieves page of file paths which have one or more services which need to run.
- #preserve_tbl ⇒ Object private
- #registered_dataset ⇒ Object private
-
#registered_paths(file_selector) ⇒ Array
Retrieves a page of paths for registered files.
-
#remove(remove_me) ⇒ Object
Remove an entry from the index.
-
#setup_index ⇒ Object
Initialize the index's database using the provided configuration.
-
#update_index_state ⇒ Object
Updates the state information for the index to indicate that the index has been refreshed or is in sync with the application's configuration.
Methods included from Logging
#initialize_logger, initialize_logger, #logger, logger
Constructor Details
#initialize(app_config, adapter, conn_details, page_size: nil) ⇒ SequelIndexDriver
Initialize the index driver
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 33 def initialize(app_config, adapter, conn_details, page_size: nil) Sequel.default_timezone = :utc @app_config = app_config @adapter = adapter @conn_details = conn_details # Digest of the app config file so we can tell if it changes @config_md5 = app_config.config_md5 @page_size = page_size.nil? || page_size <= 0 ? DEFAULT_PAGE_SIZE : page_size if @conn_details.is_a?(Hash) # Add in the adapter name @conn_details['adapter'] = adapter unless @conn_details.key?('adapter') # Add in default database name if none was specified @conn_details['database'] = INDEX_DB_NAME unless @conn_details.key?('database') end end |
Instance Method Details
#add_path_restrictions(dataset, file_selector) ⇒ Object (private)
292 293 294 295 296 297 298 299 300 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 292 def add_path_restrictions(dataset, file_selector) if file_selector.specificity == FileSelector::SPECIFICITY_STORAGE_LOCATION dataset.where(storage_location: file_selector.storage_locations) else # Reformat all selected paths into LIKE partial string matches path_conds = file_selector.target_paths.map { |path| path.end_with?('/') ? path + '%' : path } dataset.where(Sequel.like(:file_path, *path_conds)) end end |
#clear_index(older_than = nil) ⇒ Object
Remove all entries from the index
171 172 173 174 175 176 177 178 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 171 def clear_index(older_than = nil) if older_than.nil? preserve_tbl.delete else = older_than.utc.strftime(TIMESTAMP_FORMAT) preserve_tbl.where { updated < }.delete end end |
#convert_iso8601_to_timestamp(iso8601) ⇒ Object (private)
302 303 304 305 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 302 def (iso8601) return nil if iso8601.nil? Time.iso8601(iso8601).strftime(TIMESTAMP_FORMAT) end |
#db_conn ⇒ Object (private)
282 283 284 285 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 282 def db_conn @connection = Sequel.connect(@conn_details) if @connection.nil? @connection end |
#delay_until_timestamp(md_rec) ⇒ Object
Returns The first failure timestamp for any service, or nil if there were none.
144 145 146 147 148 149 150 151 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 144 def (md_rec) md_rec.list_services.each do |service_name| service_rec = md_rec.service(service_name) return service_rec. unless service_rec..nil? end # return lowest possible date return end |
#each_registered_path(file_selector, older_than: nil, &block) ⇒ Object
Calls the provided block once per each registered file path registered. Must be passed a block.
268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 268 def each_registered_path(file_selector, older_than: nil, &block) dataset = add_path_restrictions(registered_dataset, file_selector) .select(:file_path) if !older_than.nil? = older_than.utc.strftime(TIMESTAMP_FORMAT) dataset = dataset.where { updated < } end # Yield to the provided block once per row return dataset.paged_each(:rows_per_fetch => @page_size) do |row| block.call(row[:file_path]) end end |
#first_service_execution_timestamp(expected_services, md_rec) ⇒ Object
Find the earliest service execution time for any services expected to be run for the specified file.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 105 def (expected_services, md_rec) current_time = Time.now.utc.iso8601(3) if md_rec.deregistered? return nil end service_times = Array.new present_services = md_rec.list_services expected_services.each do |service_def| service_name = service_def.name # Service has never run, set execution time to now if !present_services.include?(service_name) service_times << current_time next end service_rec = md_rec.service(service_name) # Service either needs a run or has no timestamp, so execution time of now if service_rec.run_needed || service_rec..nil? service_times << current_time next end # Calculate the next time this service should run based on frequency frequency = service_def.frequency unless frequency.nil? = service_rec. service_times << ServiceDateHelper.(, frequency) next end end # Return the lowest service execution time service_times.min end |
#index(file_rec) ⇒ Object
Index the provided file_rec and its metadata
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 59 def index(file_rec) file_path = file_rec.path md_rec = file_rec. storage_loc = file_rec.storage_location service_manager = @app_config.service_manager # Produce a list of service definitions which should apply to the file expected_services = service_manager.list_service_definitions( location: storage_loc.name) = (expected_services, md_rec) = (md_rec) = () = () now_stamp = Time.now.utc.strftime(TIMESTAMP_FORMAT) if @adapter == :mysql || @adapter == :mysql2 preserve_tbl.on_duplicate_key_update .insert(file_path: file_path, storage_location: storage_loc.name, service_time: , delay_until_time: , updated: now_stamp) else preserve_tbl.insert_conflict(target: :file_path, update: { storage_location: storage_loc.name, service_time: , delay_until_time: , updated: now_stamp } ) .insert(file_path: file_path, storage_location: storage_loc.name, service_time: , delay_until_time: , updated: now_stamp) end end |
#is_stale? ⇒ Boolean
Returns true if the application configuration does not match the configuration used for the last reindex.
52 53 54 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 52 def is_stale? db_conn[INDEX_STATE_TBL].where(config_md5: @config_md5).count == 0 end |
#minimum_timestamp ⇒ Object (private)
307 308 309 310 311 312 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 307 def if @min_timestamp.nil? @min_timestamp = ServiceDateHelper.(Time.at(0).utc) end @min_timestamp end |
#paths_with_stale_services(file_selector, stale_datetime) ⇒ Array
Retrieves page of file paths which have one or more services which need to run.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 238 def paths_with_stale_services(file_selector, stale_datetime) if @preserve_dataset.nil? @preserve_dataset = db_conn .from(PRESERVE_TBL) .exclude(service_time: nil) .limit(@page_size) .order(Sequel.asc(:service_time)) end # retrieve and return a page of results ds = add_path_restrictions(@preserve_dataset, file_selector) .where { service_time <= stale_datetime } .where { delay_until_time < stale_datetime } .select_map(:file_path) end |
#preserve_tbl ⇒ Object (private)
287 288 289 290 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 287 def preserve_tbl @preserve_tbl = db_conn[PRESERVE_TBL] if @preserve_tbl.nil? @preserve_tbl end |
#registered_dataset ⇒ Object (private)
314 315 316 317 318 319 320 321 322 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 314 def registered_dataset if @registered_dataset.nil? @registered_dataset = db_conn .from(PRESERVE_TBL) .limit(@page_size) .order(Sequel.asc(:service_time)) end @registered_dataset end |
#registered_paths(file_selector) ⇒ Array
Retrieves a page of paths for registered files.
257 258 259 260 261 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 257 def registered_paths(file_selector) # retrieve and return a page of results add_path_restrictions(registered_dataset, file_selector) .select_map(:file_path) end |
#remove(remove_me) ⇒ Object
Remove an entry from the index
155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 155 def remove(remove_me) if remove_me.is_a?(FileRecord) path = remove_me.path else path = remove_me end result = preserve_tbl.where(file_path: path).delete if result == 0 logger.warn("Could not remove #{path} from the index, path was not present.") end end |
#setup_index ⇒ Object
Initialize the index's database using the provided configuration
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 181 def setup_index # Create the table for tracking when files will need preservation services run on them. case @adapter when :mysql, :mysql2 # mysql does not support 'text' fields as primary keys db_conn.create_table!(PRESERVE_TBL) do String :file_path, primary_key: true, size: 768 column :storage_location, 'varchar(128)' column :service_time, 'timestamp(3)', { :null => true } column :delay_until_time, 'timestamp(3)' column :updated, 'timestamp(3)' end else db_conn.create_table!(PRESERVE_TBL) do String :file_path, primary_key: true, text: true column :storage_location, 'varchar(128)' column :service_time, 'timestamp(3)', { :null => true } column :delay_until_time, 'timestamp(3)' column :updated, 'timestamp(3)' end end # Setup database indexes case @adapter when :postgres db_conn.run("CREATE INDEX service_times_file_path_text_index ON preserve_service_times (file_path text_pattern_ops)") when :sqlite, :amalgalite db_conn.run("CREATE INDEX service_times_file_path_text_index ON preserve_service_times (file_path collate nocase)") end db_conn.run("CREATE INDEX service_times_storage_location_index ON preserve_service_times (storage_location)") # Create table for tracking the state of the index db_conn.create_table!(INDEX_STATE_TBL) do String :config_md5 DateTime :last_reindexed String :longleaf_version end # Prepopulate the index state information update_index_state end |
#update_index_state ⇒ Object
Updates the state information for the index to indicate that the index has been refreshed or is in sync with the application's configuration.
225 226 227 228 229 230 231 232 |
# File 'lib/longleaf/indexing/sequel_index_driver.rb', line 225 def update_index_state index_state_tbl = db_conn[INDEX_STATE_TBL] index_state_tbl.delete index_state_tbl.insert( config_md5: @config_md5, last_reindexed: Time.now.utc, longleaf_version: Longleaf::VERSION) end |