import csv from helix.constants.file_validation_error import FileValidationError, FileValidationMessage class CsvInputValidator(object): def __init__(self, user_values): self.user_values = user_values def validate(self, csv_input): try: headers, rows = self.parse_cad_input(csv_input) if len(headers) < 5: return FileValidationError(FileValidationMessage.InvalidHeaders, 0) if len(rows) == 0: return FileValidationError(FileValidationMessage.InvalidRowCount, 0) for row_index, row in enumerate(rows): chain = [ CsvInputValidator.validate_for_csv, CsvInputValidator.validate_for_wind_zones, CsvInputValidator.validate_for_panel_type, ] result = self.run_validation_chain(headers, row, chain) if result: return FileValidationError(result, row_index + 1) file_validation_chain = [ CsvInputValidator.validate_file_for_panel_types, CsvInputValidator.validate_for_spacing, ] result = self.run_validation_chain(headers, rows, file_validation_chain) if result: return FileValidationError(result, None) return None except: return FileValidationError(FileValidationMessage.Generic, None) # Row Validators def validate_for_csv(self, headers, row): if len(row) != len(headers): return FileValidationMessage.Generic return None def validate_for_wind_zones(self, headers, row): valid_wind_zones = self.user_values.system_type().system_constants().wind_zones if row[2] not in valid_wind_zones: return FileValidationMessage.invalid_wind_zones(self.user_values.system_type()) return None def validate_for_panel_type(self, headers, row): valid_panel_types = range(1, 5) if int(row[3]) not in valid_panel_types: return FileValidationMessage.PanelTypeOutOfBounds return None # File Validators def validate_file_for_panel_types(self, headers, rows): panel_type_counts_per_subarray = {} for row in rows: subarray = int(row[4]) panel_type_counts = panel_type_counts_per_subarray.get(subarray) or {} panel_type = int(row[3]) count = panel_type_counts.get(panel_type) or 0 panel_type_counts[panel_type] = count + 1 panel_type_counts_per_subarray[subarray] = panel_type_counts minimum_module_count = self.user_values.system_type().system_constants().minimum_corner_module_count for panel_type_counts in panel_type_counts_per_subarray.values(): if (panel_type_counts.get(1) or 0) < minimum_module_count: return FileValidationMessage.panel_type_too_few_corners(self.user_values.system_type()) return None def validate_for_spacing(self, headers, rows): try: min_spacing = self.user_values.module_system_constants().panel_spacing scaling_factor = self.user_values.module_system_constants().tolerance + 1 min_spacing = (min_spacing[0]*scaling_factor, min_spacing[1]*scaling_factor) for row_index, row in enumerate(rows): for row_index2, row2 in enumerate(rows): if row_index2 <= row_index: continue x1_pos = float(row[11]) y1_pos = float(row[12]) x2_pos = float(row2[11]) y2_pos = float(row2[12]) x_diff = abs(x1_pos-x2_pos) y_diff = abs(y1_pos-y2_pos) if (x_diff < min_spacing[0]) and (y_diff < min_spacing[1]): return FileValidationMessage.PanelsTooClose except: return FileValidationMessage.Generic # Helpers def run_validation_chain(self, headers, data, chain): result = None chain = list(chain) while not result and len(chain) > 0: method = chain.pop() result = method(self, headers, data) return result def parse_cad_input(self, cad_input): reader = csv.reader(cad_input.splitlines(), dialect='excel-tab') headers = next(reader) rows = [row for row in reader] return headers, rows