diff --git a/specifyweb/backend/stored_queries/batch_edit.py b/specifyweb/backend/stored_queries/batch_edit.py index ce073f28b42..a952cbd7dd7 100644 --- a/specifyweb/backend/stored_queries/batch_edit.py +++ b/specifyweb/backend/stored_queries/batch_edit.py @@ -1,5 +1,3 @@ - - # type: ignore # ^^ The above is because we etensively use recursive typedefs of named tuple in this file not supported on our MyPy 0.97 version. @@ -10,14 +8,25 @@ from typing import ( Any, NamedTuple, - TypedDict, ) from collections.abc import Callable from specifyweb.backend.permissions.permissions import has_target_permission -from specifyweb.specify.api.filter_by_col import CONCRETE_HIERARCHY +from specifyweb.backend.stored_queries.batch_edit_helper_classes import ( + BATCH_EDIT_NULL_RECORD_DESCRIPTION, + BATCH_EDIT_READONLY_TABLES, + BATCH_EDIT_SHARED_READONLY_FIELDS, + BATCH_EDIT_SHARED_READONLY_RELATIONSHIPS, + EMPTY_FIELD, + EMPTY_PACK, + FLOAT_FIELDS, + BatchEditFieldPack, + BatchEditMetaTables, + BatchEditPack, + BatchEditProps, +) from specifyweb.specify.models import datamodel -from specifyweb.specify.models_utils.load_datamodel import Field, Relationship, Table +from specifyweb.specify.models_utils.load_datamodel import Relationship, Table from specifyweb.backend.trees.views import TREE_INFORMATION, get_all_tree_information from specifyweb.backend.trees.utils import SPECIFY_TREES from specifyweb.specify.datamodel import is_tree_table @@ -30,7 +39,7 @@ ) from specifyweb.backend.workbench.models import Spdataset from specifyweb.backend.workbench.permissions import BatchEditDataSetPT -from specifyweb.backend.workbench.upload.treerecord import TreeRecord, TreeRankRecord, RANK_KEY_DELIMITER +from specifyweb.backend.workbench.upload.treerecord import TreeRecord, TreeRankRecord from specifyweb.backend.workbench.upload.upload_plan_schema import parse_column_options from specifyweb.backend.workbench.upload.upload_table import UploadTable from specifyweb.backend.workbench.upload.uploadable import NULL_RECORD, Uploadable @@ -45,36 +54,9 @@ from django.db import transaction -MaybeField = Callable[[QueryFieldSpec], Field | None] - -# TODO: -# Investigate if any/some/most of the logic for making an upload plan could be moved to frontend and reused. -# - does generation of upload plan in the backend bc upload plan is not known (we don't know count of to-many). -# - seemed complicated to merge upload plan from the frontend -# - need to place id markers at correct level, so need to follow upload plan anyways. -# REFACTOR: Break this file into smaller pieaces - -# TODO: Play-around with localizing -BATCH_EDIT_NULL_RECORD_DESCRIPTION = "" - -# TODO: add backend support for making system tables readonly -BATCH_EDIT_READONLY_TABLES = [*CONCRETE_HIERARCHY] - -BATCH_EDIT_SHARED_READONLY_FIELDS = [ - "timestampcreated", - "timestampmodified", - "version", - "nodenumber", - "highestchildnodenumber", - "rankid", - "fullname", - "age", -] - -BATCH_EDIT_SHARED_READONLY_RELATIONSHIPS = ["createdbyagent", "modifiedbyagent"] - -BATCH_EDIT_REQUIRED_TREE_FIELDS = ["name"] +import logging +logger = logging.getLogger(__name__) def get_readonly_fields(table: Table): fields = [*BATCH_EDIT_SHARED_READONLY_FIELDS, table.idFieldName.lower()] @@ -100,10 +82,6 @@ def get_readonly_fields(table: Table): return fields, [*BATCH_EDIT_SHARED_READONLY_RELATIONSHIPS, *relationships] - -FLOAT_FIELDS = ["java.lang.Float", "java.lang.Double", "java.math.BigDecimal"] - - def parse(value: Any | None, query_field: QueryField) -> Any: field = query_field.fieldspec.get_field() if field is None or value is None: @@ -117,168 +95,6 @@ def parse(value: Any | None, query_field: QueryField) -> Any: 'longitude2')): return float(value) return value - - -def _get_nested_order(field_spec: QueryFieldSpec): - # don't care about ordernumber if it ain't nested - # won't affect logic, just data being saved. - if len(field_spec.join_path) == 0: - return None - return field_spec.table.get_field("ordernumber") - - -batch_edit_fields: dict[str, tuple[MaybeField, int]] = { - # technically, if version updates are correct, this is useless beyond base tables - # and to-manys. TODO: Do just that. remove it. sorts asc. using sort, the optimized - # dataset construction takes place. - "id": (lambda field_spec: field_spec.table.idField, 1), - # version control gets added here. no sort. - "version": (lambda field_spec: field_spec.table.get_field("version"), 0), - # ordernumber. no sort (actually adding a sort here is useless) - "order": (_get_nested_order, 1), -} - - -class BatchEditFieldPack(NamedTuple): - field: QueryField | None = None - idx: int | None = None # default value not there, for type safety - value: Any = None # stricten this? - - -class BatchEditPack(NamedTuple): - id: BatchEditFieldPack - order: BatchEditFieldPack - version: BatchEditFieldPack - - # extends a path to contain the last field + for a defined fields - @staticmethod - def from_field_spec(field_spec: QueryFieldSpec) -> "BatchEditPack": - # don't care about which way. bad things will happen if not sorted. - # not using assert () since it can be optimised out. - if batch_edit_fields["id"][1] == 0 or batch_edit_fields["order"][1] == 0: - raise Exception("the ID field should always be sorted!") - - def extend_callback(sort_type): - def _callback(field): - return BatchEditPack._query_field( - field_spec._replace( - join_path=(*field_spec.join_path, field), date_part=None - ), - sort_type, - ) - - return _callback - - new_field_specs = { - key: BatchEditFieldPack( - idx=None, - field=Func.maybe(callback(field_spec), extend_callback(sort_type)), - value=None, - ) - for key, (callback, sort_type) in batch_edit_fields.items() - } - return BatchEditPack(**new_field_specs) - - def merge(self, other: "BatchEditPack") -> "BatchEditPack": - return BatchEditPack( - id=self.id if self.id.field is not None else other.id, - version=self.version if self.version.field is not None else other.version, - order=self.order if self.order.field is not None else other.order, - ) - - # a basic query field spec to field - @staticmethod - def _query_field(field_spec: QueryFieldSpec, sort_type: int): - return QueryField( - fieldspec=field_spec, - op_num=8, - value=None, - negate=False, - display=True, - format_name=None, - sort_type=sort_type, - strict=False, - ) - - def _index( - self, - start_idx: int, - current: tuple[dict[str, BatchEditFieldPack], list[QueryField]], - next: tuple[int, tuple[str, tuple[MaybeField, int]]], - ): - current_dict, fields = current - field_idx, (field_name, _) = next - value: BatchEditFieldPack = getattr(self, field_name) - new_dict = { - **current_dict, - field_name: value._replace( - field=None, idx=((field_idx + start_idx) if value.field else None) - ), - } - new_fields = fields if value.field is None else [*fields, value.field] - return new_dict, new_fields - - def index_plan(self, start_index=0) -> tuple["BatchEditPack", list[QueryField]]: - init: tuple[dict[str, BatchEditFieldPack], list[QueryField]] = ( - {}, - [], - ) - _dict, fields = reduce( - lambda accum, next: self._index( - start_idx=start_index, current=accum, next=next - ), - enumerate(batch_edit_fields.items()), - init, - ) - return BatchEditPack(**_dict), fields - - def bind(self, row: tuple[Any]): - return BatchEditPack( - id=self.id._replace( - value=row[self.id.idx] if self.id.idx is not None else None, - ), - order=self.order._replace( - value=row[self.order.idx] if self.order.idx is not None else None - ), - version=self.version._replace( - value=row[self.version.idx] if self.version.idx is not None else None - ), - ) - - def to_json(self) -> dict[str, Any]: - return { - "id": self.id.value, - "ordernumber": self.order.value, - "version": self.version.value, - } - - # we not only care that it is part of tree, but also care that there is rank to tree - def is_part_of_tree(self, query_fields: list[QueryField]) -> bool: - if self.id.idx is None: - return False - id_field = self.id.idx - field = query_fields[id_field - 1] - join_path = field.fieldspec.join_path - if len(join_path) < 2: - return False - return isinstance(join_path[-2], TreeRankQuery) - - @staticmethod - def replace_tree_rank(fieldspec: QueryFieldSpec, tree_rank: TreeRankQuery) -> QueryFieldSpec: - return fieldspec._replace( - join_path=tuple( - [ - tree_rank if isinstance(node, TreeRankQuery) else node - for node in fieldspec.join_path - ] - ) - ) - - def readjust_tree_rank(self, tree_rank: TreeRankQuery): - id_field = self.id._replace(field=self.id.field._replace(fieldspec=BatchEditPack.replace_tree_rank(self.id.field.fieldspec, tree_rank))) if self.id.field is not None else self.id - order_field = self.order._replace(field=self.order.field._replace(fieldspec=BatchEditPack.replace_tree_rank(self.order.field.fieldspec, tree_rank))) if self.order.field is not None else self.order - version_field = self.version._replace(field=self.version.field._replace(fieldspec=BatchEditPack.replace_tree_rank(self.version.field.fieldspec, tree_rank))) if self.version.field is not None else self.version - return BatchEditPack(id=id_field, order=order_field, version=version_field) def get_tree_rank_record(key) -> TreeRankRecord: from specifyweb.backend.workbench.upload.treerecord import RANK_KEY_DELIMITER @@ -286,12 +102,6 @@ def get_tree_rank_record(key) -> TreeRankRecord: tree_name, rank_name, tree_def_id = tuple(key.split(RANK_KEY_DELIMITER)) return TreeRankRecord(RANK_KEY_DELIMITER.join([tree_name, rank_name]), int(tree_def_id)) - -# These constants are purely for memory optimization, no code depends and/or cares if this is constant. -EMPTY_FIELD = BatchEditFieldPack() -EMPTY_PACK = BatchEditPack(id=EMPTY_FIELD, order=EMPTY_FIELD, version=EMPTY_FIELD) - - # FUTURE: this already supports nested-to-many for most part # wb plan, but contains query fields along with indexes to look-up in a result row. # TODO: see if it can be moved + combined with front-end logic. I kept all parsing on backend, but there might be possible beneft in doing this @@ -606,7 +416,6 @@ def rewrite( # RowPlanMap is just a map, this stores actual data (to many is a dict of list, rather than just a dict) # maybe unify that with RowPlanMap? - class RowPlanCanonical(NamedTuple): batch_edit_pack: BatchEditPack columns: list[BatchEditFieldPack] = [] @@ -887,8 +696,9 @@ def _flatten(_: str, _self: "RowPlanCanonical"): def to_upload_plan( self, base_table: Table, - localization_dump: dict[str, dict[str, str]], + batch_edit_meta_tables: BatchEditMetaTables, query_fields: list[QueryField], + query_field_caption_lookup: dict[QueryField, str], fields_added: dict[str, int], get_column_id: Callable[[str], int], omit_relationships: bool, @@ -908,14 +718,23 @@ def _lookup_in_fields(_id: int | None, readonly_fields: list[str]): field = query_fields[ _id - 1 ] # Need to go off by 1, bc we added 1 to account for id fields - table_name, field_name = _get_table_and_field(field) - field_labels = localization_dump.get(table_name, {}) # It could happen that the field we saw doesn't exist. # Plus, the default options get chosen in the cases of - if field_name not in field_labels or field.fieldspec.contains_tree_rank(): + table_name, field_name = _get_table_and_field(field) + field_caption = query_field_caption_lookup.get(field, None) + table_field_labels = batch_edit_meta_tables.get_table_field_labels(table_name) + if ( + table_field_labels is None + or not table_field_labels.has_field_label(field_name) + or field.fieldspec.contains_tree_rank() + ): localized_label = naive_field_format(field.fieldspec) else: - localized_label = field_labels[field_name] + field_label = table_field_labels.use_field_label(field_name, field_caption) + localized_label = ( + field_label.caption if field_label is not None else naive_field_format(field.fieldspec) + ) + string_id = field.fieldspec.to_stringid() fields_added[localized_label] = fields_added.get(localized_label, 0) + 1 _count = fields_added[localized_label] @@ -958,8 +777,9 @@ def _to_upload_plan(rel_name: str, _self: "RowPlanCanonical"): ) return _self.to_upload_plan( related_model, - localization_dump, + batch_edit_meta_tables, query_fields, + query_field_caption_lookup, fields_added, get_column_id, omit_relationships, @@ -1071,19 +891,6 @@ def run_batch_edit(collection, user, spquery, agent): visual_order=visual_order, ) - -class BatchEditProps(TypedDict): - collection: Any - user: Any - contexttableid: int - captions: Any - limit: int | None - recordsetid: int | None - session_maker: Any - fields: list[QueryField] - omit_relationships: bool | None - treedefsfilter: Any - def _get_table_and_field(field: QueryField): table_name = field.fieldspec.table.name field_name = None if field.fieldspec.get_field() is None else field.fieldspec.get_field().name @@ -1149,8 +956,15 @@ def run_batch_edit_query(props: BatchEditProps): recordsetid = props["recordsetid"] fields = props["fields"] - visible_fields = [field for field in fields if field.display] + + field_caption_pairs: list[tuple[QueryField, str]] = [] + query_field_caption_lookup: dict[QueryField, str] = {} + if captions is not None: + field_caption_pairs = list(zip(visible_fields, captions)) + query_field_caption_lookup = dict(field_caption_pairs) + if len(fields) != len(set(fields)): + logger.info("Key Collision: query field appears more than once.") treedefsfilter = props["treedefsfilter"] @@ -1158,14 +972,23 @@ def run_batch_edit_query(props: BatchEditProps): len(visible_fields) == len(captions) ), "Got misaligned captions!" - localization_dump = {} + field_caption_pairs: list[tuple[QueryField, str]] = [] if captions: - for (field, caption) in zip(visible_fields, captions): - table_name, field_name = _get_table_and_field(field) - field_labels = localization_dump.get(table_name, {}) - field_labels[field_name] = caption - localization_dump[table_name] = field_labels + field_caption_pairs = list(zip(visible_fields, captions)) + + query_field_caption_lookup: dict[QueryField, str] = { + field: caption for field, caption in field_caption_pairs + } + + localization_dump: dict[str, list[tuple[str, str, bool]]] = {} + for field, caption in field_caption_pairs: + table_name, field_name = _get_table_and_field(field) + field_labels = localization_dump.get(table_name, []) + new_field_label = (field_name, caption, False) + field_labels.append(new_field_label) + localization_dump[table_name] = field_labels + batch_edit_meta_tables = BatchEditMetaTables(localization_dump) naive_row_plan = RowPlanMap.get_row_plan(visible_fields) all_tree_info = get_all_tree_information(props["collection"], props["user"].id) base_table = datamodel.get_table_by_id_strict(tableid, strict=True) @@ -1259,8 +1082,9 @@ def _get_orig_column(string_id: str): # The keys are lookups into original query field (not modified by us). Used to get ids in the original one. key_and_headers, upload_plan = extend_row.to_upload_plan( base_table, - localization_dump, + batch_edit_meta_tables, query_fields, + query_field_caption_lookup, {}, _get_orig_column, omit_relationships, @@ -1330,4 +1154,4 @@ def filter_tree_info(filters: dict[str, list[int]], all_tree_info: dict[str, lis tree_filter = set(filters[tablename]) all_tree_info[treetable_key] = list(filter(lambda tree_info : tree_info['definition']['id'] in tree_filter, all_tree_info[treetable_key])) - return all_tree_info \ No newline at end of file + return all_tree_info diff --git a/specifyweb/backend/stored_queries/batch_edit_helper_classes.py b/specifyweb/backend/stored_queries/batch_edit_helper_classes.py new file mode 100644 index 00000000000..b7ebe05f69c --- /dev/null +++ b/specifyweb/backend/stored_queries/batch_edit_helper_classes.py @@ -0,0 +1,345 @@ +# type: ignore + +# ^^ The above is because we etensively use recursive typedefs of named tuple in this file not supported on our MyPy 0.97 version. +# When typechecked in MyPy 1.11 (supports recursive typedefs), there is no type issue in the file. +# However, using 1.11 makes things slower in other files. + +from functools import reduce +from typing import ( + Any, + NamedTuple, + TypedDict, +) +from collections.abc import Callable + +from specifyweb.specify.api.filter_by_col import CONCRETE_HIERARCHY +from specifyweb.specify.models_utils.load_datamodel import Field +from specifyweb.backend.stored_queries.queryfield import QueryField +from specifyweb.backend.stored_queries.queryfieldspec import QueryFieldSpec, TreeRankQuery +from specifyweb.specify.utils.func import Func + +MaybeField = Callable[[QueryFieldSpec], Field | None] + +# TODO: +# Investigate if any/some/most of the logic for making an upload plan could be moved to frontend and reused. +# - does generation of upload plan in the backend bc upload plan is not known (we don't know count of to-many). +# - seemed complicated to merge upload plan from the frontend +# - need to place id markers at correct level, so need to follow upload plan anyways. +# REFACTOR: Break this file into smaller pieaces + +# TODO: Play-around with localizing +BATCH_EDIT_NULL_RECORD_DESCRIPTION = "" + +# TODO: add backend support for making system tables readonly +BATCH_EDIT_READONLY_TABLES = [*CONCRETE_HIERARCHY] + +BATCH_EDIT_SHARED_READONLY_FIELDS = [ + "timestampcreated", + "timestampmodified", + "version", + "nodenumber", + "highestchildnodenumber", + "rankid", + "fullname", + "age", +] + +BATCH_EDIT_SHARED_READONLY_RELATIONSHIPS = ["createdbyagent", "modifiedbyagent"] + +BATCH_EDIT_REQUIRED_TREE_FIELDS = ["name"] + +FLOAT_FIELDS = ["java.lang.Float", "java.lang.Double", "java.math.BigDecimal"] + +class BatchEditFieldPack(NamedTuple): + field: QueryField | None = None + idx: int | None = None # default value not there, for type safety + value: Any = None # stricten this? + +class BatchEditPack(NamedTuple): + id: BatchEditFieldPack + order: BatchEditFieldPack + version: BatchEditFieldPack + + # extends a path to contain the last field + for a defined fields + @staticmethod + def from_field_spec(field_spec: QueryFieldSpec) -> "BatchEditPack": + # don't care about which way. bad things will happen if not sorted. + # not using assert () since it can be optimised out. + if batch_edit_fields["id"][1] == 0 or batch_edit_fields["order"][1] == 0: + raise Exception("the ID field should always be sorted!") + + def extend_callback(sort_type): + def _callback(field): + return BatchEditPack._query_field( + field_spec._replace( + join_path=(*field_spec.join_path, field), date_part=None + ), + sort_type, + ) + + return _callback + + new_field_specs = { + key: BatchEditFieldPack( + idx=None, + field=Func.maybe(callback(field_spec), extend_callback(sort_type)), + value=None, + ) + for key, (callback, sort_type) in batch_edit_fields.items() + } + return BatchEditPack(**new_field_specs) + + def merge(self, other: "BatchEditPack") -> "BatchEditPack": + return BatchEditPack( + id=self.id if self.id.field is not None else other.id, + version=self.version if self.version.field is not None else other.version, + order=self.order if self.order.field is not None else other.order, + ) + + # a basic query field spec to field + @staticmethod + def _query_field(field_spec: QueryFieldSpec, sort_type: int): + return QueryField( + fieldspec=field_spec, + op_num=8, + value=None, + negate=False, + display=True, + format_name=None, + sort_type=sort_type, + strict=False, + ) + + def _index( + self, + start_idx: int, + current: tuple[dict[str, BatchEditFieldPack], list[QueryField]], + next: tuple[int, tuple[str, tuple[MaybeField, int]]], + ): + current_dict, fields = current + field_idx, (field_name, _) = next + value: BatchEditFieldPack = getattr(self, field_name) + new_dict = { + **current_dict, + field_name: value._replace( + field=None, idx=((field_idx + start_idx) if value.field else None) + ), + } + new_fields = fields if value.field is None else [*fields, value.field] + return new_dict, new_fields + + def index_plan(self, start_index=0) -> tuple["BatchEditPack", list[QueryField]]: + init: tuple[dict[str, BatchEditFieldPack], list[QueryField]] = ( + {}, + [], + ) + _dict, fields = reduce( + lambda accum, next: self._index( + start_idx=start_index, current=accum, next=next + ), + enumerate(batch_edit_fields.items()), + init, + ) + return BatchEditPack(**_dict), fields + + def bind(self, row: tuple[Any]): + return BatchEditPack( + id=self.id._replace( + value=row[self.id.idx] if self.id.idx is not None else None, + ), + order=self.order._replace( + value=row[self.order.idx] if self.order.idx is not None else None + ), + version=self.version._replace( + value=row[self.version.idx] if self.version.idx is not None else None + ), + ) + + def to_json(self) -> dict[str, Any]: + return { + "id": self.id.value, + "ordernumber": self.order.value, + "version": self.version.value, + } + + # we not only care that it is part of tree, but also care that there is rank to tree + def is_part_of_tree(self, query_fields: list[QueryField]) -> bool: + if self.id.idx is None: + return False + id_field = self.id.idx + field = query_fields[id_field - 1] + join_path = field.fieldspec.join_path + if len(join_path) < 2: + return False + return isinstance(join_path[-2], TreeRankQuery) + + @staticmethod + def replace_tree_rank(fieldspec: QueryFieldSpec, tree_rank: TreeRankQuery) -> QueryFieldSpec: + return fieldspec._replace( + join_path=tuple( + [ + tree_rank if isinstance(node, TreeRankQuery) else node + for node in fieldspec.join_path + ] + ) + ) + + def readjust_tree_rank(self, tree_rank: TreeRankQuery): + id_field = self.id._replace(field=self.id.field._replace(fieldspec=BatchEditPack.replace_tree_rank(self.id.field.fieldspec, tree_rank))) if self.id.field is not None else self.id + order_field = self.order._replace(field=self.order.field._replace(fieldspec=BatchEditPack.replace_tree_rank(self.order.field.fieldspec, tree_rank))) if self.order.field is not None else self.order + version_field = self.version._replace(field=self.version.field._replace(fieldspec=BatchEditPack.replace_tree_rank(self.version.field.fieldspec, tree_rank))) if self.version.field is not None else self.version + return BatchEditPack(id=id_field, order=order_field, version=version_field) + +def _get_nested_order(field_spec: QueryFieldSpec): + # don't care about ordernumber if it ain't nested + # won't affect logic, just data being saved. + if len(field_spec.join_path) == 0: + return None + return field_spec.table.get_field("ordernumber") + +batch_edit_fields: dict[str, tuple[MaybeField, int]] = { + # technically, if version updates are correct, this is useless beyond base tables + # and to-manys. TODO: Do just that. remove it. sorts asc. using sort, the optimized + # dataset construction takes place. + "id": (lambda field_spec: field_spec.table.idField, 1), + # version control gets added here. no sort. + "version": (lambda field_spec: field_spec.table.get_field("version"), 0), + # ordernumber. no sort (actually adding a sort here is useless) + "order": (_get_nested_order, 1), +} + +# These constants are purely for memory optimization, no code depends and/or cares if this is constant. +EMPTY_FIELD = BatchEditFieldPack() +EMPTY_PACK = BatchEditPack(id=EMPTY_FIELD, order=EMPTY_FIELD, version=EMPTY_FIELD) + +class FieldLabel(NamedTuple): + field_name: str + caption: str + is_used: bool = False + + def __str__(self) -> str: + return ( + "FieldLabel(" + f"field_name={self.field_name!r}, " + f"caption={self.caption!r}, " + f"is_used={self.is_used}" + ")" + ) + + __repr__ = __str__ + +class TableFieldLabels: + table_name: str + field_labels: list[FieldLabel] + + def __init__(self, table_name: str): + self.table_name = table_name + self.field_labels = [] + + def __iter__(self): + return iter(self.field_labels) + + def add_field_label(self, field_label: FieldLabel): + self.field_labels.append(field_label) + + def get_field_label_names(self) -> list[str]: + return [field.field_name for field in self.field_labels] + + def has_field_label(self, field_name: str) -> bool: + return any(field.field_name == field_name for field in self.field_labels) + + def use_field_label( + self, + field_name: str, + expected_caption: str | None = None, + ) -> FieldLabel | None: + last_match_idx: int | None = None + for idx, field_label in enumerate(self.field_labels): + if field_label.field_name != field_name: + continue + caption_matches = ( + expected_caption is None + or field_label.caption.lower() == expected_caption.lower() + ) + last_match_idx = idx + if not field_label.is_used and caption_matches: + updated = field_label._replace(is_used=True) + self.field_labels[idx] = updated + return updated + if last_match_idx is not None: + updated = self.field_labels[last_match_idx]._replace(is_used=True) + self.field_labels[last_match_idx] = updated + return updated + return None + + def __str__(self) -> str: + indent_str = " " * indent + inner_indent = " " * (indent + 2) + if not self.field_labels: + return ( + f"{indent_str}TableFieldLabels(" + f"table_name={self.table_name!r}, field_labels=[]" + ")" + ) + lines = [ + f"{indent_str}TableFieldLabels(", + f"{indent_str} table_name={self.table_name!r},", + f"{indent_str} field_labels=[", + ] + lines.extend(f"{inner_indent}{label}" for label in self.field_labels) + lines.append(f"{indent_str} ]") + lines.append(f"{indent_str})") + return "\n".join(lines) + + __repr__ = __str__ + +class BatchEditMetaTables: + table_labels: dict[str, TableFieldLabels] = {} + + def __init__(self): + self.table_labels = {} + + def __init__(self, localization_dump: dict[str, list[tuple[str, str, bool]]]): + self.table_labels = {} + for table_name, field_labels in localization_dump.items(): + table_field_labels = TableFieldLabels(table_name) + for field_label in field_labels: + table_field_labels.add_field_label(FieldLabel(field_label[0], field_label[1], field_label[2])) + self.table_labels[table_name] = table_field_labels + + def add_field_label(self, table_name: str, field_label: FieldLabel): + if table_name not in self.table_labels: + self.table_labels[table_name] = TableFieldLabels(table_name) + self.table_labels[table_name].add_field_label(field_label) + + def get_table_field_labels(self, table_name: str) -> TableFieldLabels | None: + return self.table_labels.get(table_name, None) + + def get_all_table_names(self) -> list[str]: + return list(self.table_labels.keys()) + + def __str__(self) -> str: + indent_str = " " * indent + if not self.table_labels: + return f"{indent_str}BatchEditMetaTables(table_labels={{}})" + lines = [f"{indent_str}BatchEditMetaTables("] + for table_name in sorted(self.table_labels.keys()): + table_lines = self.table_labels[table_name].to_pretty_string(indent + 4) + lines.append(f"{indent_str} {table_name!r}:") + lines.append(table_lines) + lines.append(f"{indent_str})") + return "\n".join(lines) + + __repr__ = __str__ + +class BatchEditProps(TypedDict): + collection: Any + user: Any + contexttableid: int + captions: Any + limit: int | None + recordsetid: int | None + session_maker: Any + fields: list[QueryField] + omit_relationships: bool | None + treedefsfilter: Any diff --git a/specifyweb/backend/stored_queries/tests/test_batch_edit.py b/specifyweb/backend/stored_queries/tests/test_batch_edit.py index 2b9c954ce3c..6372521c6f3 100644 --- a/specifyweb/backend/stored_queries/tests/test_batch_edit.py +++ b/specifyweb/backend/stored_queries/tests/test_batch_edit.py @@ -2310,3 +2310,36 @@ def test_to_one_stalls_to_many(self): ] self.assertEqual(correct_packs, packs) + + def test_column_key_collision(self): + # Build QB Query + base_table = "collectionobject" + expected_captions = ['Cataloger - GUID', 'Determiner - GUID'] + query_paths = [ + ["cataloger", "GUID"], + ["determinations", "Determiner", "GUID"], + ] + + added = [(base_table, *path) for path in query_paths] + + query_fields = [ + BatchEditPack._query_field(QueryFieldSpec.from_path(path), 0) + for path in added + ] + + props = BatchEditProps( + collection=self.collection, + user=self.specifyuser, + contexttableid=datamodel.get_table_strict(base_table).tableId, + fields=query_fields, + session_maker=QueryConstructionTests.test_session_context, + captions=expected_captions, + limit=None, + recordsetid=None, + omit_relationships=False, + treedefsfilter=None + ) + (headers, rows, packs, plan, order) = run_batch_edit_query(props) + + self.assertEqual(headers, expected_captions) +