import csv from helix.constants.file_validation_error import FileValidationError, FileValidationMessage from helix.calculators.coordinates_calculator import CoordinatesCalculator from helix.models.coordinate import Coordinate class CsvInputValidator(object): def __init__(self, user_values): self.user_values = user_values def validate(self, csv_input): try: headers, rows = self.parse_cad_input(csv_input) if len(headers) < 5: return FileValidationError(FileValidationMessage.InvalidHeaders, 0) if len(rows) == 0: return FileValidationError(FileValidationMessage.InvalidRowCount, 0) for row_index, row in enumerate(rows): chain = [ CsvInputValidator.validate_for_csv, CsvInputValidator.validate_for_wind_zones, CsvInputValidator.validate_for_panel_type, ] result = self.run_validation_chain(headers, row, chain) if result: return FileValidationError(result, row_index + 1) file_validation_chain = [ CsvInputValidator.validate_file_for_panel_types, CsvInputValidator.validate_for_spacing, ] result = self.run_validation_chain(headers, rows, file_validation_chain) if result: return FileValidationError(result, None) return None except Exception as inst: return FileValidationError(FileValidationMessage.Generic, None) # Row Validators def validate_for_csv(self, headers, row): if len(row) != len(headers): return FileValidationMessage.Generic return None def validate_for_wind_zones(self, headers, row): valid_wind_zones = self.user_values.system_type().system_constants().wind_zones if row[2] not in valid_wind_zones: return FileValidationMessage.invalid_wind_zones(self.user_values.system_type()) return None def validate_for_panel_type(self, headers, row): valid_panel_types = range(1, 5) if int(row[3]) not in valid_panel_types: return FileValidationMessage.PanelTypeOutOfBounds return None # File Validators def validate_file_for_panel_types(self, headers, rows): panel_type_counts_per_subarray = {} for row in rows: subarray = int(row[4]) panel_type_counts = panel_type_counts_per_subarray.get(subarray) or {} panel_type = int(row[3]) count = panel_type_counts.get(panel_type) or 0 panel_type_counts[panel_type] = count + 1 panel_type_counts_per_subarray[subarray] = panel_type_counts minimum_module_count = self.user_values.system_type().system_constants().minimum_corner_module_count for panel_type_counts in panel_type_counts_per_subarray.values(): if (panel_type_counts.get(1) or 0) < minimum_module_count: return FileValidationMessage.panel_type_too_few_corners(self.user_values.system_type()) return None def validate_for_spacing(self, headers, rows): try: if len(headers) < 13: return None coordinates_calculator = CoordinatesCalculator(self.user_values) scaling_factor = self.user_values.module_system_constants().tolerance + 1 spacing_x, spacing_y = self.user_values.module_system_constants().panel_spacing spacing_coordinates = Coordinate(spacing_x*scaling_factor, spacing_y*scaling_factor,0) spacing_coordinates_scaled = coordinates_calculator.scale(spacing_coordinates) for row_index, row in enumerate(rows): first_panel = Coordinate(float(row[11]), float(row[12]), float(row[13])) first_panel_rotated = coordinates_calculator.rotate(first_panel) first_panel_scaled = coordinates_calculator.scale(first_panel_rotated) for row_index2, row2 in enumerate(rows): if row_index2 <= row_index: continue second_panel = Coordinate(float(row2[11]), float(row2[12]), float(row2[13])) second_panel_rotated = coordinates_calculator.rotate(second_panel) second_panel_scaled = coordinates_calculator.scale(second_panel_rotated) x_diff = round(abs(first_panel_scaled.x - second_panel_scaled.x),3) y_diff = round(abs(first_panel_scaled.y - second_panel_scaled.y),3) if (x_diff < spacing_coordinates_scaled.x) and (y_diff < spacing_coordinates_scaled.y): return FileValidationMessage.PanelsTooClose return None except Exception as inst: print("ERROR !!! ") print(inst) return FileValidationMessage.Generic # Helpers def run_validation_chain(self, headers, data, chain): result = None chain = list(chain) while not result and len(chain) > 0: method = chain.pop() result = method(self, headers, data) return result def parse_cad_input(self, cad_input): reader = csv.reader(cad_input.splitlines(), dialect='excel-tab') headers = next(reader) rows = [row for row in reader] return headers, rows