# frozen_string_literal: true # Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Google module Cloud module Bigtable # @private # # RowsMutator # # Read a chunk of data and merge it based on states and build rows and cells # class ChunkProcessor # Row states NEW_ROW = 1 ROW_IN_PROGRESS = 2 CELL_IN_PROGRESS = 3 # Current state attr_accessor :state, :last_key # Current cached row data attr_accessor :chunk, :row # Current cell values attr_accessor :cur_family, :cur_qaul, :cur_ts, :cur_val, :cur_labels # @private # # Create chunk reader object and set row state to new # def initialize reset_to_new_row end ## # Process chunk and build full row with cells # # @param chunk [Google::Bigtable::V2::ReadRowsResponse::CellChunk] # def process chunk self.chunk = chunk raise_if chunk.value_size.positive?, "Commit rows cannot have a non-zero value_size." if chunk.commit_row if state == NEW_ROW process_new_row elsif state == CELL_IN_PROGRESS process_cell_in_progress elsif state == ROW_IN_PROGRESS process_row_in_progress end end ## # Validate row status commit or reset # # @raise [Google::Cloud::Bigtable::InvalidRowStateError] # if chunk has data on reset row state # def validate_reset_row return unless chunk.reset_row value = (!chunk.row_key.empty? || chunk.family_name || chunk.qualifier || !chunk.value.empty? || chunk.timestamp_micros.positive?) raise_if value, "A reset should have no data" end ## # Validate chunk has new row state # # @raise [Google::Cloud::Bigtable::InvalidRowStateError] # If row already has a set key, chunk has an empty row key, chunk # state is reset, new row key is the same as the last-read key, # or if family name or column qualifier are empty # def validate_new_row raise_if row.key, "A new row cannot have existing state" raise_if chunk.row_key.empty?, "A row key must be set" raise_if chunk.reset_row, "A new row cannot be reset" raise_if last_key == chunk.row_key, "A commit happened but the same key followed" raise_if chunk.family_name.nil?, "A family must be set" raise_if chunk.qualifier.nil?, "A column qualifier must be set" end ## # Validate chunk merge is in progress to build new row # # @raise [Google::Cloud::Bigtable::InvalidRowStateError] # If row and chunk row key are not same or chunk row key is empty. # def validate_row_in_progress raise_if !chunk.row_key.empty? && chunk.row_key != row.key, "A commit is required between row keys" raise_if chunk.family_name && chunk.qualifier.nil?, "A qualifier must be specified" validate_reset_row end ## # Process new row by setting values from current chunk. # # @return [Google::Cloud::Bigtable::Row] # def process_new_row validate_new_row return if chunk.family_name.nil? || chunk.qualifier.nil? row.key = chunk.row_key self.cur_family = chunk.family_name.value self.cur_qaul = chunk.qualifier.value self.cur_ts = chunk.timestamp_micros self.cur_labels = chunk.labels next_state! end ## # Process chunk if row state is in progress # # @return [Google::Cloud::Bigtable::Row] # def process_row_in_progress validate_row_in_progress return reset_to_new_row if chunk.reset_row self.cur_family = chunk.family_name.value if chunk.family_name self.cur_qaul = chunk.qualifier.value if chunk.qualifier self.cur_ts = chunk.timestamp_micros self.cur_labels = chunk.labels if chunk.labels next_state! end ## # Process chunk if row cell state is in progress # # @return [Google::Cloud::Bigtable::Row] # def process_cell_in_progress validate_reset_row return reset_to_new_row if chunk.reset_row next_state! end ## # Set next state of row. # # @return [Google::Cloud::Bigtable::Row] # def next_state! if cur_val self.cur_val += chunk.value else self.cur_val = chunk.value end if chunk.value_size.zero? persist_cell self.state = ROW_IN_PROGRESS else self.state = CELL_IN_PROGRESS end return unless chunk.commit_row self.last_key = row.key completed_row = row reset_to_new_row completed_row end # Build cell and append to row. def persist_cell cell = Row::Cell.new cur_family, cur_qaul, cur_ts, cur_val, cur_labels row.cells[cur_family] << cell # Clear cached cell values self.cur_val = nil self.cur_ts = nil self.cur_labels = nil end # Reset read state and cached data def reset_to_new_row self.row = Row.new self.state = NEW_ROW self.cur_family = nil self.cur_qaul = nil self.cur_ts = nil self.cur_val = nil self.cur_labels = nil end ## # Validate last row is completed # # @raise [Google::Cloud::Bigtable::InvalidRowStateError] # If read rows response end without last row completed # def validate_last_row_complete return if row.key.nil? raise_if !chunk.commit_row, "Response ended with pending row without commit" end private ## # Raise error on condition failure # # @raise [Google::Cloud::Bigtable::InvalidRowStateError] # def raise_if condition, message raise InvalidRowStateError.new(message, chunk.to_h) if condition end end end end end