From 86ed2c60539ba805e5f9ffe9a2008429fc16f70b Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 9 Oct 2025 16:56:48 +0100 Subject: [PATCH 01/31] Workflow for picked tomos --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 835d0b5e..7ef7cc84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey" "clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run" "pato" = "murfey.workflows.notifications:notification_setup" "picked_particles" = "murfey.workflows.spa.picking:particles_picked" +"picked_tomogram" = "murfey.workflows.tomo.feedback:picked_tomogram" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] From 32ecc10bec84cbc2f2fabafd640b83b561358b1f Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 13 Oct 2025 14:39:50 +0100 Subject: [PATCH 02/31] Generate extract pj --- src/murfey/client/contexts/tomo.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/murfey/client/contexts/tomo.py b/src/murfey/client/contexts/tomo.py index 7e6e1386..3439759a 100644 --- a/src/murfey/client/contexts/tomo.py +++ b/src/murfey/client/contexts/tomo.py @@ -161,7 +161,13 @@ def register_tomography_data_collections( data=dc_data, ) - for recipe in ("em-tomo-preprocess", "em-tomo-align"): + recipes_to_assign_pjids = [ + "em-tomo-preprocess", + "em-tomo-align", + ] + if not self._tilt_series_with_pjids: + recipes_to_assign_pjids.append("em-tomo-class2d") + for recipe in recipes_to_assign_pjids: capture_post( base_url=str(environment.url.geturl()), router_name="workflow.router", From 7bd8c2ab3c5f410dd147ad35dfff09046df798ee Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 13 Oct 2025 14:40:29 +0100 Subject: [PATCH 03/31] Tomo class bits for db --- src/murfey/util/db.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index ef160613..ebdb3c5d 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -459,12 +459,18 @@ class ProcessingJob(SQLModel, table=True): # type: ignore spa_parameters: List["SPARelionParameters"] = Relationship( back_populates="processing_job", sa_relationship_kwargs={"cascade": "delete"} ) - spa_feedback_parameters: List["SPAFeedbackParameters"] = Relationship( - back_populates="processing_job", sa_relationship_kwargs={"cascade": "delete"} + classification_feedback_parameters: List["ClassificationFeedbackParameters"] = ( + Relationship( + back_populates="processing_job", + sa_relationship_kwargs={"cascade": "delete"}, + ) ) ctf_parameters: List["CtfParameters"] = Relationship( back_populates="processing_job", sa_relationship_kwargs={"cascade": "delete"} ) + tomogram_picks: List["TomogramPicks"] = Relationship( + back_populates="processing_job", sa_relationship_kwargs={"cascade": "delete"} + ) class2d_parameters: List["Class2DParameters"] = Relationship( back_populates="processing_job", sa_relationship_kwargs={"cascade": "delete"} ) @@ -514,6 +520,7 @@ class TomographyProcessingParameters(SQLModel, table=True): # type: ignore frame_count: int tilt_axis: float voltage: int + particle_diameter: Optional[float] = None eer_fractionation_file: Optional[str] = None motion_corr_binning: int = 1 gain_ref: Optional[str] = None @@ -557,8 +564,10 @@ class MurfeyLedger(SQLModel, table=True): # type: ignore refine_parameters: Optional["RefineParameters"] = Relationship( back_populates="murfey_ledger", sa_relationship_kwargs={"cascade": "delete"} ) - spa_feedback_parameters: Optional["SPAFeedbackParameters"] = Relationship( - back_populates="murfey_ledger", sa_relationship_kwargs={"cascade": "delete"} + classification_feedback_parameters: Optional["ClassificationFeedbackParameters"] = ( + Relationship( + back_populates="murfey_ledger", sa_relationship_kwargs={"cascade": "delete"} + ) ) movies: Optional["Movie"] = Relationship( back_populates="murfey_ledger", sa_relationship_kwargs={"cascade": "delete"} @@ -671,6 +680,18 @@ class CtfParameters(SQLModel, table=True): # type: ignore ) +class TomogramPicks(SQLModel, table=True): # type: ignore + id: Optional[int] = Field(default=None, primary_key=True) + pj_id: int = Field(foreign_key="processingjob.id") + tomogram: str + cbox_3d: str + particle_count: int + tomogram_pixel_size: float + processing_job: Optional[ProcessingJob] = Relationship( + back_populates="tomogram_picks" + ) + + class ParticleSizes(SQLModel, table=True): # type: ignore id: Optional[int] = Field(default=None, primary_key=True) pj_id: int = Field(foreign_key="processingjob.id") @@ -700,7 +721,7 @@ class SPARelionParameters(SQLModel, table=True): # type: ignore ) -class SPAFeedbackParameters(SQLModel, table=True): # type: ignore +class ClassificationFeedbackParameters(SQLModel, table=True): # type: ignore pj_id: int = Field(primary_key=True, foreign_key="processingjob.id") estimate_particle_diameter: bool = True hold_class2d: bool = False @@ -714,10 +735,10 @@ class SPAFeedbackParameters(SQLModel, table=True): # type: ignore picker_murfey_id: Optional[int] = Field(default=None, foreign_key="murfeyledger.id") picker_ispyb_id: Optional[int] = None processing_job: Optional[ProcessingJob] = Relationship( - back_populates="spa_feedback_parameters" + back_populates="classification_feedback_parameters" ) murfey_ledger: Optional[MurfeyLedger] = Relationship( - back_populates="spa_feedback_parameters" + back_populates="classification_feedback_parameters" ) From d8afdac198e7ea1ac83bac36a7d26738049d870d Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 14 Oct 2025 09:08:49 +0100 Subject: [PATCH 04/31] Tomo picking feedback workflow --- src/murfey/util/processing_params.py | 8 ++ src/murfey/workflows/tomo/feedback.py | 173 ++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 src/murfey/workflows/tomo/feedback.py diff --git a/src/murfey/util/processing_params.py b/src/murfey/util/processing_params.py index 65a51c20..34564683 100644 --- a/src/murfey/util/processing_params.py +++ b/src/murfey/util/processing_params.py @@ -79,3 +79,11 @@ class SPAParameters(BaseModel): default_spa_parameters = SPAParameters() + + +class TomographyParameters(BaseModel): + batch_size_2d: int = 50000 + nr_picks_before_diameter: int = 10000 + + +default_tomo_parameters = TomographyParameters() diff --git a/src/murfey/workflows/tomo/feedback.py b/src/murfey/workflows/tomo/feedback.py new file mode 100644 index 00000000..2726166b --- /dev/null +++ b/src/murfey/workflows/tomo/feedback.py @@ -0,0 +1,173 @@ +from logging import getLogger + +import numpy as np +from sqlalchemy import func +from sqlmodel import Session, select + +from murfey.server import _transport_object +from murfey.server.feedback import _app_id, _murfey_id +from murfey.util.config import get_machine_config +from murfey.util.db import AutoProcProgram, DataCollection, ParticleSizes, ProcessingJob +from murfey.util.db import Session as MurfeySession +from murfey.util.db import TomogramPicks, TomographyProcessingParameters +from murfey.util.processing_params import default_tomo_parameters + +logger = getLogger("murfey.workflows.tomo.feedback") + + +def _pj_id_tomo_classification(app_id: int, recipe: str, _db) -> int: + dcg_id = ( + _db.exec( + select(AutoProcProgram, ProcessingJob, DataCollection) + .where(AutoProcProgram.id == app_id) + .where(AutoProcProgram.pj_id == ProcessingJob.id) + .where(ProcessingJob.dc_id == DataCollection.id) + ) + .one()[2] + .dcg_id + ) + pj_id = ( + _db.exec( + select(ProcessingJob, DataCollection) + .where(DataCollection.dcg_id == dcg_id) + .where(ProcessingJob.dc_id == DataCollection.id) + .where(ProcessingJob.recipe == recipe) + ) + .one()[0] + .id + ) + return pj_id + + +def _register_picked_tomogram_use_diameter(message: dict, _db: Session): + """Received picked particles from the tomogram autopick service""" + # Add this message to the table of seen messages + pj_id = _pj_id_tomo_classification(message["program_id"], "em-tomo-class2d", _db) + + pick_params = TomogramPicks( + pj_id=pj_id, + tomogram=message["tomogram"], + cbox_3d=message["cbox_3d"], + particle_count=message["particle_count"], + tomogram_pixel_size=message["pixel_size"], + ) + _db.add(pick_params) + _db.commit() + _db.close() + + picking_db_len = _db.exec( + select(func.count(ParticleSizes.id)).where(ParticleSizes.pj_id == pj_id) + ).one() + if picking_db_len > default_tomo_parameters.nr_picks_before_diameter: + # If there are enough particles to get a diameter + instrument_name = ( + _db.exec( + select(MurfeySession).where(MurfeySession.id == message["session_id"]) + ) + .one() + .instrument_name + ) + machine_config = get_machine_config(instrument_name=instrument_name)[ + instrument_name + ] + tomo_params = _db.exec( + select(TomographyProcessingParameters).where( + TomographyProcessingParameters.pj_id == pj_id + ) + ).one() + + particle_diameter = tomo_params.particle_diameter + + if not particle_diameter: + # If the diameter has not been calculated then find it + picking_db = _db.exec( + select(ParticleSizes.particle_size).where(ParticleSizes.pj_id == pj_id) + ).all() + particle_diameter = np.quantile(list(picking_db), 0.75) + tomo_params.particle_diameter = particle_diameter + _db.add(tomo_params) + _db.commit() + + tomo_pick_db = _db.exec( + select(TomogramPicks).where(TomogramPicks.pj_id == pj_id) + ).all() + for saved_message in tomo_pick_db: + # Send on all saved messages to extraction + class_uuids = { + str(i + 1): m + for i, m in enumerate( + _murfey_id(_app_id(pj_id, _db), _db, number=50) + ) + } + class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] + _db.expunge(saved_message) + zocalo_message: dict = { + "parameters": { + "tomogram": saved_message.tomogram, + "cbox_3d": saved_message.cbox_3d, + "pixel_size": saved_message.tomogram_pixel_size, + "particle_diameter": particle_diameter, + "kv": tomo_params.voltage, + "node_creator_queue": machine_config.node_creator_queue, + "session_id": message["session_id"], + "autoproc_program_id": _app_id(pj_id, _db), + "batch_size": default_tomo_parameters.batch_size_2d, + "picker_id": None, + "class2d_grp_uuid": class2d_grp_uuid, + "class_uuids": class_uuids, + }, + "recipes": ["em-spa-extract"], + } + if _transport_object: + zocalo_message["parameters"][ + "feedback_queue" + ] = _transport_object.feedback_queue + _transport_object.send( + "processing_recipe", zocalo_message, new_connection=True + ) + else: + # If the diameter is known then just send the new message + particle_diameter = tomo_params.particle_diameter + class_uuids = { + str(i + 1): m + for i, m in enumerate(_murfey_id(_app_id(pj_id, _db), _db, number=50)) + } + class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] + zocalo_message = { + "parameters": { + "tomogram": message["tomogram"], + "cbox_3d": message["cbox_3d"], + "pixel_size": message["pixel_size"], + "particle_diameter": particle_diameter, + "kv": tomo_params.voltage, + "node_creator_queue": machine_config.node_creator_queue, + "session_id": message["session_id"], + "autoproc_program_id": _app_id(pj_id, _db), + "batch_size": default_tomo_parameters.batch_size_2d, + "picker_id": None, + "class2d_grp_uuid": class2d_grp_uuid, + "class_uuids": class_uuids, + }, + "recipes": ["em-tomo-class2d"], + } + if _transport_object: + zocalo_message["parameters"][ + "feedback_queue" + ] = _transport_object.feedback_queue + _transport_object.send( + "processing_recipe", zocalo_message, new_connection=True + ) + else: + # If not enough particles then save the new sizes + particle_list = message.get("particle_diameters") + assert isinstance(particle_list, list) + for particle in particle_list: + new_particle = ParticleSizes(pj_id=pj_id, particle_size=particle) + _db.add(new_particle) + _db.commit() + _db.close() + + +def particles_tomogram(message: dict, murfey_db: Session) -> bool: + _register_picked_tomogram_use_diameter(message, murfey_db) + return True From 2bfd0ffed12b9cbcadde1272ff300f0a9b172abd Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 14 Oct 2025 09:12:51 +0100 Subject: [PATCH 05/31] Renamed table and fix rebase --- src/murfey/cli/inject_spa_processing.py | 6 +-- src/murfey/cli/spa_ispyb_messages.py | 2 +- src/murfey/server/api/session_info.py | 8 +-- src/murfey/server/api/workflow.py | 6 +-- src/murfey/server/demo_api.py | 12 ++--- src/murfey/server/feedback.py | 50 ++++++++++--------- .../workflows/spa/flush_spa_preprocess.py | 6 +-- src/murfey/workflows/spa/picking.py | 15 ++++-- 8 files changed, 56 insertions(+), 49 deletions(-) diff --git a/src/murfey/cli/inject_spa_processing.py b/src/murfey/cli/inject_spa_processing.py index 7a7f66bf..69f350c6 100644 --- a/src/murfey/cli/inject_spa_processing.py +++ b/src/murfey/cli/inject_spa_processing.py @@ -13,12 +13,12 @@ from murfey.util.config import get_machine_config, get_microscope, get_security_config from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, ClientEnvironment, DataCollection, DataCollectionGroup, Movie, ProcessingJob, - SPAFeedbackParameters, SPARelionParameters, ) from murfey.util.processing_params import default_spa_parameters @@ -137,9 +137,9 @@ def run(): .where(ProcessingJob.recipe == "em-spa-preprocess") ).one() params = murfey_db.exec( - select(SPARelionParameters, SPAFeedbackParameters) + select(SPARelionParameters, ClassificationFeedbackParameters) .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(SPAFeedbackParameters.pj_id == SPARelionParameters.pj_id) + .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) ).one() proc_params: dict | None = dict(params[0]) feedback_params = params[1] diff --git a/src/murfey/cli/spa_ispyb_messages.py b/src/murfey/cli/spa_ispyb_messages.py index 640b585f..a183616c 100644 --- a/src/murfey/cli/spa_ispyb_messages.py +++ b/src/murfey/cli/spa_ispyb_messages.py @@ -363,7 +363,7 @@ def run(): small_boxsize=metadata["small_boxsize"], mask_diameter=metadata["mask_diameter"], ) - feedback_params = db.SPAFeedbackParameters( + feedback_params = db.ClassificationFeedbackParameters( pj_id=collected_ids[2].id, estimate_particle_diameter=not bool(metadata["particle_diameter"]), hold_class2d=False, diff --git a/src/murfey/server/api/session_info.py b/src/murfey/server/api/session_info.py index 62f3be1f..fa10faec 100644 --- a/src/murfey/server/api/session_info.py +++ b/src/murfey/server/api/session_info.py @@ -31,6 +31,7 @@ from murfey.util import sanitise from murfey.util.config import MachineConfig, get_machine_config from murfey.util.db import ( + ClassificationFeedbackParameters, ClientEnvironment, DataCollection, DataCollectionGroup, @@ -41,7 +42,6 @@ RsyncInstance, Session, SessionProcessingParameters, - SPAFeedbackParameters, SPARelionParameters, Tilt, TiltSeries, @@ -280,7 +280,7 @@ class ProcessingDetails(BaseModel): data_collections: List[DataCollection] processing_jobs: List[ProcessingJob] relion_params: SPARelionParameters - feedback_params: SPAFeedbackParameters + feedback_params: ClassificationFeedbackParameters @spa_router.get("/sessions/{session_id}/spa_processing_parameters") @@ -293,13 +293,13 @@ def get_spa_proc_param_details( DataCollection, ProcessingJob, SPARelionParameters, - SPAFeedbackParameters, + ClassificationFeedbackParameters, ) .where(DataCollectionGroup.session_id == session_id) .where(DataCollectionGroup.id == DataCollection.dcg_id) .where(DataCollection.id == ProcessingJob.dc_id) .where(SPARelionParameters.pj_id == ProcessingJob.id) - .where(SPAFeedbackParameters.pj_id == ProcessingJob.id) + .where(ClassificationFeedbackParameters.pj_id == ProcessingJob.id) ).all() if not params: return None diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index 8fec45f5..ddd60e09 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -44,6 +44,7 @@ from murfey.util.config import get_machine_config from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, DataCollection, DataCollectionGroup, FoilHole, @@ -54,7 +55,6 @@ SearchMap, Session, SessionProcessingParameters, - SPAFeedbackParameters, SPARelionParameters, Tilt, TiltSeries, @@ -409,9 +409,9 @@ async def request_spa_preprocessing( .where(ProcessingJob.recipe == "em-spa-preprocess") ).one() params = db.exec( - select(SPARelionParameters, SPAFeedbackParameters) + select(SPARelionParameters, ClassificationFeedbackParameters) .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(SPAFeedbackParameters.pj_id == SPARelionParameters.pj_id) + .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) ).one() proc_params: Optional[dict] = dict(params[0]) feedback_params = params[1] diff --git a/src/murfey/server/demo_api.py b/src/murfey/server/demo_api.py index c59cde2c..79fab4c0 100644 --- a/src/murfey/server/demo_api.py +++ b/src/murfey/server/demo_api.py @@ -49,6 +49,7 @@ ) from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, ClientEnvironment, DataCollection, DataCollectionGroup, @@ -60,7 +61,6 @@ ProcessingJob, RsyncInstance, Session, - SPAFeedbackParameters, SPARelionParameters, Tilt, TiltSeries, @@ -244,7 +244,7 @@ class ProcessingDetails(BaseModel): data_collections: List[DataCollection] processing_jobs: List[ProcessingJob] relion_params: SPARelionParameters - feedback_params: SPAFeedbackParameters + feedback_params: ClassificationFeedbackParameters @router.get("/sessions/{session_id}/spa_processing_parameters") @@ -257,13 +257,13 @@ def get_spa_proc_param_details( DataCollection, ProcessingJob, SPARelionParameters, - SPAFeedbackParameters, + ClassificationFeedbackParameters, ) .where(DataCollectionGroup.session_id == session_id) .where(DataCollectionGroup.id == DataCollection.dcg_id) .where(DataCollection.id == ProcessingJob.dc_id) .where(SPARelionParameters.pj_id == ProcessingJob.id) - .where(SPAFeedbackParameters.pj_id == ProcessingJob.id) + .where(ClassificationFeedbackParameters.pj_id == ProcessingJob.id) ).all() if not params: return None @@ -560,9 +560,9 @@ def flush_spa_processing( .where(ProcessingJob.recipe == "em-spa-preprocess") ).one() params = db.exec( - select(SPARelionParameters, SPAFeedbackParameters) + select(SPARelionParameters, ClassificationFeedbackParameters) .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(SPAFeedbackParameters.pj_id == SPARelionParameters.pj_id) + .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) ).one() proc_params = dict(params[0]) feedback_params = params[1] diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 288f3926..79313ec9 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -313,13 +313,15 @@ def _pj_id(app_id: int, _db, recipe: str = "") -> int: def _get_spa_params( app_id: int, _db -) -> Tuple[db.SPARelionParameters, db.SPAFeedbackParameters]: +) -> Tuple[db.SPARelionParameters, db.ClassificationFeedbackParameters]: pj_id = _pj_id(app_id, _db, recipe="em-spa-preprocess") relion_params = _db.exec( select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id) ).one() feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where(db.SPAFeedbackParameters.pj_id == pj_id) + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id + ) ).one() _db.expunge(relion_params) _db.expunge(feedback_params) @@ -412,8 +414,8 @@ def _release_3d_hold(message: dict, _db): ) ).one() feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() class3d_params = _db.exec( @@ -490,8 +492,8 @@ def _release_refine_hold(message: dict, _db): ) ).one() feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() refine_params = _db.exec( @@ -582,8 +584,8 @@ def _register_incomplete_2d_batch(message: dict, _db, demo: bool = False): ) ).one() feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() if feedback_params.hold_class2d: @@ -706,8 +708,8 @@ def _register_complete_2d_batch(message: dict, _db, demo: bool = False): ) ).one() feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() _db.expunge(relion_params) @@ -913,7 +915,7 @@ def _flush_class2d( app_id: int, _db, relion_params: db.SPARelionParameters | None = None, - feedback_params: db.SPAFeedbackParameters | None = None, + feedback_params: db.ClassificationFeedbackParameters | None = None, ): instrument_name = ( _db.exec(select(db.Session).where(db.Session.id == session_id)) @@ -934,8 +936,8 @@ def _flush_class2d( _db.expunge(relion_params) if not feedback_params: feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() _db.expunge(feedback_params) @@ -1012,8 +1014,8 @@ def _register_class_selection(message: dict, _db, demo: bool = False): ).all() # Add the class selection score to the database feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() _db.expunge(feedback_params) @@ -1233,8 +1235,8 @@ def _register_3d_batch(message: dict, _db, demo: bool = False): ).one() relion_options = dict(relion_params) feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() other_options = dict(feedback_params) @@ -1411,8 +1413,8 @@ def _register_initial_model(message: dict, _db, demo: bool = False): pj_id_params = _pj_id(message["program_id"], _db, recipe="em-spa-preprocess") # Add the initial model file to the database feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() feedback_params.initial_model = message.get("initial_model") @@ -1578,8 +1580,8 @@ def _register_refinement(message: dict, _db, demo: bool = False): ).one() relion_options = dict(relion_params) feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() other_options = dict(feedback_params) @@ -1726,8 +1728,8 @@ def _register_bfactors(message: dict, _db, demo: bool = False): ).one() relion_options = dict(relion_params) feedback_params = _db.exec( - select(db.SPAFeedbackParameters).where( - db.SPAFeedbackParameters.pj_id == pj_id_params + select(db.ClassificationFeedbackParameters).where( + db.ClassificationFeedbackParameters.pj_id == pj_id_params ) ).one() @@ -2289,7 +2291,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None: eer_fractionation_file=message["eer_fractionation_file"], symmetry=message["symmetry"], ) - feedback_params = db.SPAFeedbackParameters( + feedback_params = db.ClassificationFeedbackParameters( pj_id=collected_ids[2].id, estimate_particle_diameter=True, hold_class2d=False, diff --git a/src/murfey/workflows/spa/flush_spa_preprocess.py b/src/murfey/workflows/spa/flush_spa_preprocess.py index 98dc8309..606511bb 100644 --- a/src/murfey/workflows/spa/flush_spa_preprocess.py +++ b/src/murfey/workflows/spa/flush_spa_preprocess.py @@ -13,6 +13,7 @@ from murfey.util.config import get_machine_config, get_microscope from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, DataCollection, DataCollectionGroup, FoilHole, @@ -21,7 +22,6 @@ PreprocessStash, ProcessingJob, Session as MurfeySession, - SPAFeedbackParameters, SPARelionParameters, ) from murfey.util.models import FoilHoleParameters, GridSquareParameters @@ -338,9 +338,9 @@ def flush_spa_preprocess(message: dict, murfey_db: Session, demo: bool = False) .where(ProcessingJob.recipe == recipe_name) ).one() params = murfey_db.exec( - select(SPARelionParameters, SPAFeedbackParameters) + select(SPARelionParameters, ClassificationFeedbackParameters) .where(SPARelionParameters.pj_id == collected_ids[2].id) - .where(SPAFeedbackParameters.pj_id == SPARelionParameters.pj_id) + .where(ClassificationFeedbackParameters.pj_id == SPARelionParameters.pj_id) ).one() proc_params = params[0] feedback_params = params[1] diff --git a/src/murfey/workflows/spa/picking.py b/src/murfey/workflows/spa/picking.py index 72a1e3ba..894a5f48 100644 --- a/src/murfey/workflows/spa/picking.py +++ b/src/murfey/workflows/spa/picking.py @@ -17,6 +17,7 @@ from murfey.util.config import get_machine_config from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, CtfParameters, DataCollection, Movie, @@ -26,7 +27,6 @@ ProcessingJob, SelectionStash, Session as MurfeySession, - SPAFeedbackParameters, SPARelionParameters, ) from murfey.util.processing_params import default_spa_parameters @@ -78,7 +78,9 @@ def _register_picked_particles_use_diameter( ).one() relion_options = dict(relion_params) feedback_params = _db.exec( - select(SPAFeedbackParameters).where(SPAFeedbackParameters.pj_id == pj_id) + select(ClassificationFeedbackParameters).where( + ClassificationFeedbackParameters.pj_id == pj_id + ) ).one() particle_diameter = relion_params.particle_diameter @@ -263,7 +265,9 @@ def _register_picked_particles_use_boxsize(message: dict, _db: Session): select(SPARelionParameters).where(SPARelionParameters.pj_id == pj_id) ).one() feedback_params = _db.exec( - select(SPAFeedbackParameters).where(SPAFeedbackParameters.pj_id == pj_id) + select(ClassificationFeedbackParameters).where( + ClassificationFeedbackParameters.pj_id == pj_id + ) ).one() if feedback_params.picker_ispyb_id is None and _transport_object: @@ -448,8 +452,9 @@ def particles_picked(message: dict, murfey_db: Session) -> bool: murfey_db.add(movie) murfey_db.commit() feedback_params = murfey_db.exec( - select(SPAFeedbackParameters).where( - SPAFeedbackParameters.pj_id == _pj_id(message["program_id"], murfey_db) + select(ClassificationFeedbackParameters).where( + ClassificationFeedbackParameters.pj_id + == _pj_id(message["program_id"], murfey_db) ) ).one() if feedback_params.estimate_particle_diameter: From fe43c02176783adc0ea98edb021afaa341e0da42 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 14 Oct 2025 17:39:08 +0100 Subject: [PATCH 06/31] Change params available for tomo class2d --- src/murfey/util/processing_params.py | 4 ++-- src/murfey/workflows/tomo/feedback.py | 30 +++++++++++++++++---------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/murfey/util/processing_params.py b/src/murfey/util/processing_params.py index 34564683..07c5a12c 100644 --- a/src/murfey/util/processing_params.py +++ b/src/murfey/util/processing_params.py @@ -82,8 +82,8 @@ class SPAParameters(BaseModel): class TomographyParameters(BaseModel): - batch_size_2d: int = 50000 - nr_picks_before_diameter: int = 10000 + batch_size_2d: int = 10000 + nr_classes_2d: int = 5 default_tomo_parameters = TomographyParameters() diff --git a/src/murfey/workflows/tomo/feedback.py b/src/murfey/workflows/tomo/feedback.py index 2726166b..6e7f6a36 100644 --- a/src/murfey/workflows/tomo/feedback.py +++ b/src/murfey/workflows/tomo/feedback.py @@ -7,9 +7,15 @@ from murfey.server import _transport_object from murfey.server.feedback import _app_id, _murfey_id from murfey.util.config import get_machine_config -from murfey.util.db import AutoProcProgram, DataCollection, ParticleSizes, ProcessingJob -from murfey.util.db import Session as MurfeySession -from murfey.util.db import TomogramPicks, TomographyProcessingParameters +from murfey.util.db import ( + AutoProcProgram, + DataCollection, + ParticleSizes, + ProcessingJob, + Session as MurfeySession, + TomogramPicks, + TomographyProcessingParameters, +) from murfey.util.processing_params import default_tomo_parameters logger = getLogger("murfey.workflows.tomo.feedback") @@ -58,7 +64,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): picking_db_len = _db.exec( select(func.count(ParticleSizes.id)).where(ParticleSizes.pj_id == pj_id) ).one() - if picking_db_len > default_tomo_parameters.nr_picks_before_diameter: + if picking_db_len > default_tomo_parameters.batch_size_2d: # If there are enough particles to get a diameter instrument_name = ( _db.exec( @@ -112,16 +118,17 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "session_id": message["session_id"], "autoproc_program_id": _app_id(pj_id, _db), "batch_size": default_tomo_parameters.batch_size_2d, + "nr_classes": default_tomo_parameters.nr_classes_2d, "picker_id": None, "class2d_grp_uuid": class2d_grp_uuid, "class_uuids": class_uuids, }, - "recipes": ["em-spa-extract"], + "recipes": ["em-tomo-class2d"], } if _transport_object: - zocalo_message["parameters"][ - "feedback_queue" - ] = _transport_object.feedback_queue + zocalo_message["parameters"]["feedback_queue"] = ( + _transport_object.feedback_queue + ) _transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) @@ -144,6 +151,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "session_id": message["session_id"], "autoproc_program_id": _app_id(pj_id, _db), "batch_size": default_tomo_parameters.batch_size_2d, + "nr_classes": default_tomo_parameters.nr_classes_2d, "picker_id": None, "class2d_grp_uuid": class2d_grp_uuid, "class_uuids": class_uuids, @@ -151,9 +159,9 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "recipes": ["em-tomo-class2d"], } if _transport_object: - zocalo_message["parameters"][ - "feedback_queue" - ] = _transport_object.feedback_queue + zocalo_message["parameters"]["feedback_queue"] = ( + _transport_object.feedback_queue + ) _transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) From 500089e5ebce13df161e64b8d140efd1a4cb2727 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 11:24:51 +0100 Subject: [PATCH 07/31] Rename workflow --- src/murfey/workflows/tomo/{feedback.py => picking.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/murfey/workflows/tomo/{feedback.py => picking.py} (100%) diff --git a/src/murfey/workflows/tomo/feedback.py b/src/murfey/workflows/tomo/picking.py similarity index 100% rename from src/murfey/workflows/tomo/feedback.py rename to src/murfey/workflows/tomo/picking.py From eac3702358ab86e24ef0f548f9dd39d35a911b0d Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 15:02:47 +0100 Subject: [PATCH 08/31] Test for tomo picking feedback --- pyproject.toml | 2 +- tests/workflows/tomo/test_tomo_picking.py | 171 ++++++++++++++++++++++ 2 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 tests/workflows/tomo/test_tomo_picking.py diff --git a/pyproject.toml b/pyproject.toml index 7ef7cc84..ad51e355 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,7 +107,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey" "clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run" "pato" = "murfey.workflows.notifications:notification_setup" "picked_particles" = "murfey.workflows.spa.picking:particles_picked" -"picked_tomogram" = "murfey.workflows.tomo.feedback:picked_tomogram" +"picked_tomogram" = "murfey.workflows.tomo.picking:picked_tomogram" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" [tool.setuptools] diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py new file mode 100644 index 00000000..b553cb5d --- /dev/null +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -0,0 +1,171 @@ +from unittest import mock + +from sqlmodel import Session, select + +from murfey.util.db import ( + DataCollection, + DataCollectionGroup, + ParticleSizes, + ProcessingJob, + TomogramPicks, + TomographyProcessingParameters, +) +from murfey.workflows.tomo import picking +from tests.conftest import ExampleVisit, get_or_create_db_entry + + +@mock.patch("murfey.workflows.tomo.picking._pj_id_tomo_classification") +def test_picked_tomogram_not_run_class2d( + mock_pjid, murfey_db_session: Session, tmp_path +): + """Run the picker feedback with less particles than needed for classification""" + mock_pjid.return_value = 1 + + # Insert table dependencies + dcg_entry: DataCollectionGroup = get_or_create_db_entry( + murfey_db_session, + DataCollectionGroup, + lookup_kwargs={ + "id": 0, + "session_id": ExampleVisit.murfey_session_id, + "tag": "test_dcg", + }, + ) + dc_entry: DataCollection = get_or_create_db_entry( + murfey_db_session, + DataCollection, + lookup_kwargs={ + "id": 0, + "tag": "test_dc", + "dcg_id": dcg_entry.id, + }, + ) + get_or_create_db_entry( + murfey_db_session, + ProcessingJob, + lookup_kwargs={ + "id": 0, + "recipe": "test_recipe", + "dc_id": dc_entry.id, + }, + ) + + message = { + "program_id": 0, + "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/sample.cbox", + "particle_count": 2, + "particle_diameters": [10.1, 20.2], + "pixel_size": 5.3, + "register": "picked_tomogram", + "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/sample.mrc", + } + + picking._register_picked_tomogram_use_diameter(message, murfey_db_session) + + mock_pjid.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + + tomograms_db = murfey_db_session.exec( + select(TomogramPicks).where(TomogramPicks.pj_id == 1) + ).one() + assert tomograms_db.tomogram == message["tomogram"] + assert tomograms_db.cbox_3d == message["cbox_3d"] + assert tomograms_db.particle_count == 2 + assert tomograms_db.pixel_size == 5.3 + + added_picks = murfey_db_session.exec( + select(ParticleSizes).where(ParticleSizes.pj_id == 1) + ).all() + assert len(added_picks) == 2 + assert added_picks[0].particle_size == 10.1 + assert added_picks[1].particle_size == 20.2 + + +@mock.patch("murfey.workflows.tomo.picking._transport_object") +@mock.patch("murfey.workflows.tomo.picking._pj_id_tomo_classification") +def test_picked_tomogram_run_class2d( + mock_pjid, mock_transport, murfey_db_session: Session, tmp_path +): + """Run the picker feedback with less particles than needed for classification""" + mock_pjid.return_value = 1 + + # Insert table dependencies + dcg_entry: DataCollectionGroup = get_or_create_db_entry( + murfey_db_session, + DataCollectionGroup, + lookup_kwargs={ + "id": 0, + "session_id": ExampleVisit.murfey_session_id, + "tag": "test_dcg", + }, + ) + dc_entry: DataCollection = get_or_create_db_entry( + murfey_db_session, + DataCollection, + lookup_kwargs={ + "id": 0, + "tag": "test_dc", + "dcg_id": dcg_entry.id, + }, + ) + processing_job_entry: ProcessingJob = get_or_create_db_entry( + murfey_db_session, + ProcessingJob, + lookup_kwargs={ + "id": 0, + "recipe": "test_recipe", + "dc_id": dc_entry.id, + }, + ) + get_or_create_db_entry( + murfey_db_session, + TomographyProcessingParameters, + lookup_kwargs={ + "dcg_id": dcg_entry.id, + "pixel_size": 1.34, + "dose_per_frame": 1, + "frame_count": 5, + "tilt_axis": 0, + "voltage": 300, + "particle_diameter": 200, + }, + ) + for particle in range(10001): + get_or_create_db_entry( + murfey_db_session, + ParticleSizes, + lookup_kwargs={"pj_id": processing_job_entry.id, "particle_size": 100}, + ) + + message = { + "program_id": 0, + "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/sample.cbox", + "particle_count": 2, + "particle_diameters": [10.1, 20.2], + "pixel_size": 5.3, + "register": "picked_tomogram", + "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/sample.mrc", + } + + # Create a data collection group for lookups + grid_square = DataCollectionGroup( + id=1, + session_id=ExampleVisit.murfey_session_id, + tag="session_tag", + atlas_id=90, + ) + murfey_db_session.add(grid_square) + murfey_db_session.commit() + + picking._register_picked_tomogram_use_diameter(message, murfey_db_session) + + mock_pjid.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + + tomograms_db = murfey_db_session.exec( + select(TomogramPicks).where(TomogramPicks.pj_id == 1) + ).one() + assert tomograms_db.tomogram == message["tomogram"] + assert tomograms_db.cbox_3d == message["cbox_3d"] + assert tomograms_db.particle_count == 2 + assert tomograms_db.pixel_size == 5.3 + + mock_transport.assert_called_once() From 0ab610232d7f325f20e91be2662c1b2ab6228cde Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 15:09:02 +0100 Subject: [PATCH 09/31] Improve tests --- tests/workflows/tomo/test_tomo_picking.py | 29 ++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index b553cb5d..5cbb8e47 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -44,7 +44,7 @@ def test_picked_tomogram_not_run_class2d( murfey_db_session, ProcessingJob, lookup_kwargs={ - "id": 0, + "id": 1, "recipe": "test_recipe", "dc_id": dc_entry.id, }, @@ -111,7 +111,7 @@ def test_picked_tomogram_run_class2d( murfey_db_session, ProcessingJob, lookup_kwargs={ - "id": 0, + "id": 1, "recipe": "test_recipe", "dc_id": dc_entry.id, }, @@ -137,6 +137,7 @@ def test_picked_tomogram_run_class2d( ) message = { + "session_id": 11, "program_id": 0, "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/sample.cbox", "particle_count": 2, @@ -168,4 +169,26 @@ def test_picked_tomogram_run_class2d( assert tomograms_db.particle_count == 2 assert tomograms_db.pixel_size == 5.3 - mock_transport.assert_called_once() + mock_transport.send.assert_called_once_with( + "processing_recipe", + { + "parameters": { + "tomogram": message["tomogram"], + "cbox_3d": message["cbox_3d"], + "pixel_size": message["pixel_size"], + "particle_diameter": 100, + "kv": 300, + "node_creator_queue": "node_creator", + "session_id": message["session_id"], + "autoproc_program_id": 0, + "batch_size": 10000, + "nr_classes": 10, + "picker_id": None, + "class2d_grp_uuid": 0, + "class_uuids": {str(i): i for i in range(10)}, + "feedback_queue": "murfey_feedback", + }, + "recipes": ["em-tomo-class2d"], + }, + new_connection=True, + ) From 5e2267729660ed258bca54b7f996a3d46500343d Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 15:12:36 +0100 Subject: [PATCH 10/31] Mis-named item --- tests/workflows/tomo/test_tomo_picking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 5cbb8e47..7fe6a552 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -70,7 +70,7 @@ def test_picked_tomogram_not_run_class2d( assert tomograms_db.tomogram == message["tomogram"] assert tomograms_db.cbox_3d == message["cbox_3d"] assert tomograms_db.particle_count == 2 - assert tomograms_db.pixel_size == 5.3 + assert tomograms_db.tomogram_pixel_size == 5.3 added_picks = murfey_db_session.exec( select(ParticleSizes).where(ParticleSizes.pj_id == 1) @@ -167,7 +167,7 @@ def test_picked_tomogram_run_class2d( assert tomograms_db.tomogram == message["tomogram"] assert tomograms_db.cbox_3d == message["cbox_3d"] assert tomograms_db.particle_count == 2 - assert tomograms_db.pixel_size == 5.3 + assert tomograms_db.tomogram_pixel_size == 5.3 mock_transport.send.assert_called_once_with( "processing_recipe", From 8478ec3348cbc8dd06391de0ac7803c629d2e988 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 15:21:13 +0100 Subject: [PATCH 11/31] Needs to keep adding --- tests/workflows/tomo/test_tomo_picking.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 7fe6a552..bfafe662 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -14,9 +14,10 @@ from tests.conftest import ExampleVisit, get_or_create_db_entry +@mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._pj_id_tomo_classification") def test_picked_tomogram_not_run_class2d( - mock_pjid, murfey_db_session: Session, tmp_path + mock_pjid, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" mock_pjid.return_value = 1 @@ -79,6 +80,8 @@ def test_picked_tomogram_not_run_class2d( assert added_picks[0].particle_size == 10.1 assert added_picks[1].particle_size == 20.2 + mock_transport.send.assert_not_called() + @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._pj_id_tomo_classification") @@ -133,7 +136,11 @@ def test_picked_tomogram_run_class2d( get_or_create_db_entry( murfey_db_session, ParticleSizes, - lookup_kwargs={"pj_id": processing_job_entry.id, "particle_size": 100}, + lookup_kwargs={ + "id": particle, + "pj_id": processing_job_entry.id, + "particle_size": 100, + }, ) message = { @@ -176,7 +183,7 @@ def test_picked_tomogram_run_class2d( "tomogram": message["tomogram"], "cbox_3d": message["cbox_3d"], "pixel_size": message["pixel_size"], - "particle_diameter": 100, + "particle_diameter": 200, "kv": 300, "node_creator_queue": "node_creator", "session_id": message["session_id"], From 1afadc3b5844f172aa43b49d19d241a15a1a9e67 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 16:13:26 +0100 Subject: [PATCH 12/31] Fix session id --- tests/workflows/tomo/test_tomo_picking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index bfafe662..97bb1b5d 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -144,7 +144,7 @@ def test_picked_tomogram_run_class2d( ) message = { - "session_id": 11, + "session_id": 1, "program_id": 0, "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/sample.cbox", "particle_count": 2, From 9ab0fdd02f3e7694880b1fd77d1c7e7993f413bb Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 16:21:26 +0100 Subject: [PATCH 13/31] Tomo params depend on dcg_id --- src/murfey/workflows/tomo/picking.py | 11 +++++++---- tests/workflows/tomo/test_tomo_picking.py | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index 6e7f6a36..dc59382e 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -1,4 +1,5 @@ from logging import getLogger +from typing import Tuple import numpy as np from sqlalchemy import func @@ -21,7 +22,7 @@ logger = getLogger("murfey.workflows.tomo.feedback") -def _pj_id_tomo_classification(app_id: int, recipe: str, _db) -> int: +def _ids_tomo_classification(app_id: int, recipe: str, _db) -> Tuple[int, int]: dcg_id = ( _db.exec( select(AutoProcProgram, ProcessingJob, DataCollection) @@ -42,13 +43,15 @@ def _pj_id_tomo_classification(app_id: int, recipe: str, _db) -> int: .one()[0] .id ) - return pj_id + return dcg_id, pj_id def _register_picked_tomogram_use_diameter(message: dict, _db: Session): """Received picked particles from the tomogram autopick service""" # Add this message to the table of seen messages - pj_id = _pj_id_tomo_classification(message["program_id"], "em-tomo-class2d", _db) + dcg_id, pj_id = _ids_tomo_classification( + message["program_id"], "em-tomo-class2d", _db + ) pick_params = TomogramPicks( pj_id=pj_id, @@ -78,7 +81,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): ] tomo_params = _db.exec( select(TomographyProcessingParameters).where( - TomographyProcessingParameters.pj_id == pj_id + TomographyProcessingParameters.dcg_id == dcg_id ) ).one() diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 97bb1b5d..2edb0631 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -20,7 +20,7 @@ def test_picked_tomogram_not_run_class2d( mock_pjid, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" - mock_pjid.return_value = 1 + mock_pjid.return_value = [2, 1] # Insert table dependencies dcg_entry: DataCollectionGroup = get_or_create_db_entry( @@ -89,7 +89,7 @@ def test_picked_tomogram_run_class2d( mock_pjid, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" - mock_pjid.return_value = 1 + mock_pjid.return_value = [2, 1] # Insert table dependencies dcg_entry: DataCollectionGroup = get_or_create_db_entry( From 67493866327442cfb610e4090820158fef68eae4 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 16:26:19 +0100 Subject: [PATCH 14/31] Function got renamed --- tests/workflows/tomo/test_tomo_picking.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 2edb0631..40c42108 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -15,12 +15,12 @@ @mock.patch("murfey.workflows.tomo.picking._transport_object") -@mock.patch("murfey.workflows.tomo.picking._pj_id_tomo_classification") +@mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") def test_picked_tomogram_not_run_class2d( - mock_pjid, mock_transport, murfey_db_session: Session, tmp_path + mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" - mock_pjid.return_value = [2, 1] + mock_ids.return_value = [2, 1] # Insert table dependencies dcg_entry: DataCollectionGroup = get_or_create_db_entry( @@ -63,7 +63,7 @@ def test_picked_tomogram_not_run_class2d( picking._register_picked_tomogram_use_diameter(message, murfey_db_session) - mock_pjid.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) tomograms_db = murfey_db_session.exec( select(TomogramPicks).where(TomogramPicks.pj_id == 1) @@ -84,12 +84,12 @@ def test_picked_tomogram_not_run_class2d( @mock.patch("murfey.workflows.tomo.picking._transport_object") -@mock.patch("murfey.workflows.tomo.picking._pj_id_tomo_classification") +@mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") def test_picked_tomogram_run_class2d( - mock_pjid, mock_transport, murfey_db_session: Session, tmp_path + mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" - mock_pjid.return_value = [2, 1] + mock_ids.return_value = [2, 1] # Insert table dependencies dcg_entry: DataCollectionGroup = get_or_create_db_entry( @@ -166,7 +166,7 @@ def test_picked_tomogram_run_class2d( picking._register_picked_tomogram_use_diameter(message, murfey_db_session) - mock_pjid.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) tomograms_db = murfey_db_session.exec( select(TomogramPicks).where(TomogramPicks.pj_id == 1) From e4855758a9919af0d87dd7f9e68e8cbd55a1addd Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 16:31:16 +0100 Subject: [PATCH 15/31] wrong dcgid --- tests/workflows/tomo/test_tomo_picking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 40c42108..c3b7da6a 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -89,8 +89,6 @@ def test_picked_tomogram_run_class2d( mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" - mock_ids.return_value = [2, 1] - # Insert table dependencies dcg_entry: DataCollectionGroup = get_or_create_db_entry( murfey_db_session, @@ -143,6 +141,8 @@ def test_picked_tomogram_run_class2d( }, ) + mock_ids.return_value = [dcg_entry.id, 1] + message = { "session_id": 1, "program_id": 0, From 2fc442833729fa87188a852aaefa475298f5bebd Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 16:42:09 +0100 Subject: [PATCH 16/31] Fix murfey id counts and autoproc id in test --- src/murfey/server/feedback.py | 24 +++++++++++++++++++---- src/murfey/workflows/tomo/picking.py | 8 +++++++- tests/workflows/tomo/test_tomo_picking.py | 9 +++++++++ 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 79313ec9..45d4928b 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -625,7 +625,9 @@ def _register_incomplete_2d_batch(message: dict, _db, demo: bool = False): ) _db.add(class2d_params) _db.commit() - murfey_ids = _murfey_id(message["program_id"], _db, number=50) + murfey_ids = _murfey_id( + message["program_id"], _db, number=default_spa_parameters.nr_classes_2d + ) _murfey_class2ds( murfey_ids, class2d_message["particles_file"], message["program_id"], _db ) @@ -749,7 +751,9 @@ def _register_complete_2d_batch(message: dict, _db, demo: bool = False): _db.add(class2d_params) _db.commit() _db.close() - murfey_ids = _murfey_id(_app_id(pj_id, _db), _db, number=50) + murfey_ids = _murfey_id( + _app_id(pj_id, _db), _db, number=default_spa_parameters.nr_classes_2d + ) _murfey_class2ds( murfey_ids, class2d_message["particles_file"], _app_id(pj_id, _db), _db ) @@ -798,7 +802,13 @@ def _register_complete_2d_batch(message: dict, _db, demo: bool = False): else: class_uuids = { str(i + 1): m - for i, m in enumerate(_murfey_id(_app_id(pj_id, _db), _db, number=50)) + for i, m in enumerate( + _murfey_id( + _app_id(pj_id, _db), + _db, + number=default_spa_parameters.nr_classes_2d, + ) + ) } class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] zocalo_message: dict = { @@ -867,7 +877,13 @@ def _register_complete_2d_batch(message: dict, _db, demo: bool = False): else: class_uuids = { str(i + 1): m - for i, m in enumerate(_murfey_id(_app_id(pj_id, _db), _db, number=50)) + for i, m in enumerate( + _murfey_id( + _app_id(pj_id, _db), + _db, + number=default_spa_parameters.nr_classes_2d, + ) + ) } class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] zocalo_message = { diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index dc59382e..11d0799e 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -140,7 +140,13 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): particle_diameter = tomo_params.particle_diameter class_uuids = { str(i + 1): m - for i, m in enumerate(_murfey_id(_app_id(pj_id, _db), _db, number=50)) + for i, m in enumerate( + _murfey_id( + _app_id(pj_id, _db), + _db, + number=default_tomo_parameters.nr_classes_2d, + ) + ) } class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] zocalo_message = { diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index c3b7da6a..219f209d 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -3,6 +3,7 @@ from sqlmodel import Session, select from murfey.util.db import ( + AutoProcProgram, DataCollection, DataCollectionGroup, ParticleSizes, @@ -117,6 +118,14 @@ def test_picked_tomogram_run_class2d( "dc_id": dc_entry.id, }, ) + get_or_create_db_entry( + murfey_db_session, + AutoProcProgram, + lookup_kwargs={ + "id": 0, + "pj_id": processing_job_entry.id, + }, + ) get_or_create_db_entry( murfey_db_session, TomographyProcessingParameters, From 7208088f851252f678e28df129901f0ba432c719 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 15 Oct 2025 16:47:01 +0100 Subject: [PATCH 17/31] Only 5 classes --- tests/workflows/tomo/test_tomo_picking.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 219f209d..ce759f6a 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -90,6 +90,8 @@ def test_picked_tomogram_run_class2d( mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" + mock_transport.feedback_queue = "murfey_feedback" + # Insert table dependencies dcg_entry: DataCollectionGroup = get_or_create_db_entry( murfey_db_session, @@ -192,16 +194,16 @@ def test_picked_tomogram_run_class2d( "tomogram": message["tomogram"], "cbox_3d": message["cbox_3d"], "pixel_size": message["pixel_size"], - "particle_diameter": 200, + "particle_diameter": 200.0, "kv": 300, "node_creator_queue": "node_creator", "session_id": message["session_id"], "autoproc_program_id": 0, "batch_size": 10000, - "nr_classes": 10, + "nr_classes": 5, "picker_id": None, - "class2d_grp_uuid": 0, - "class_uuids": {str(i): i for i in range(10)}, + "class2d_grp_uuid": 6, + "class_uuids": {str(i): i for i in range(1, 6)}, "feedback_queue": "murfey_feedback", }, "recipes": ["em-tomo-class2d"], From 7ed3fb7c207c461866ef188d5e38d209a21e4548 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 10:02:56 +0100 Subject: [PATCH 18/31] Add feedback params for tomo --- src/murfey/server/feedback.py | 11 +++++++++++ src/murfey/workflows/tomo/picking.py | 15 +++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 45d4928b..2f0712ae 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -2364,7 +2364,18 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None: gain_ref=message["gain_ref"], eer_fractionation_file=message["eer_fractionation_file"], ) + feedback_params = db.ClassificationFeedbackParameters( + pj_id=collected_ids[2].id, + estimate_particle_diameter=True, + hold_class2d=False, + hold_class3d=False, + class_selection_score=0, + star_combination_job=0, + initial_model="", + next_job=0, + ) _db.add(params) + _db.add(feedback_params) _db.commit() _db.close() if murfey.server._transport_object: diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index 11d0799e..2442c06e 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -10,6 +10,7 @@ from murfey.util.config import get_machine_config from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, DataCollection, ParticleSizes, ProcessingJob, @@ -87,6 +88,14 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): particle_diameter = tomo_params.particle_diameter + feedback_params = _db.exec( + select(ClassificationFeedbackParameters).where( + ClassificationFeedbackParameters.pj_id == pj_id + ) + ).one() + if not feedback_params.next_job: + feedback_params.next_job = 9 + if not particle_diameter: # If the diameter has not been calculated then find it picking_db = _db.exec( @@ -125,6 +134,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "picker_id": None, "class2d_grp_uuid": class2d_grp_uuid, "class_uuids": class_uuids, + "next_job": feedback_params.next_job, }, "recipes": ["em-tomo-class2d"], } @@ -135,6 +145,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): _transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) + feedback_params.next_job += 2 else: # If the diameter is known then just send the new message particle_diameter = tomo_params.particle_diameter @@ -164,6 +175,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "picker_id": None, "class2d_grp_uuid": class2d_grp_uuid, "class_uuids": class_uuids, + "next_job": feedback_params.next_job, }, "recipes": ["em-tomo-class2d"], } @@ -174,6 +186,9 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): _transport_object.send( "processing_recipe", zocalo_message, new_connection=True ) + feedback_params.next_job += 2 + _db.add(feedback_params) + _db.commit() else: # If not enough particles then save the new sizes particle_list = message.get("particle_diameters") From 7469abd7a6289c6c4dd33dd7dd751c1e6ef22355 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 10:29:53 +0100 Subject: [PATCH 19/31] Test the route without diameter supplied --- src/murfey/workflows/tomo/picking.py | 6 +- tests/workflows/tomo/test_tomo_picking.py | 222 ++++++++++++++++------ 2 files changed, 165 insertions(+), 63 deletions(-) diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index 2442c06e..3f229673 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -114,7 +114,11 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): class_uuids = { str(i + 1): m for i, m in enumerate( - _murfey_id(_app_id(pj_id, _db), _db, number=50) + _murfey_id( + _app_id(pj_id, _db), + _db, + number=default_tomo_parameters.nr_classes_2d, + ) ) } class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index ce759f6a..e5bf09db 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -4,6 +4,7 @@ from murfey.util.db import ( AutoProcProgram, + ClassificationFeedbackParameters, DataCollection, DataCollectionGroup, ParticleSizes, @@ -15,15 +16,8 @@ from tests.conftest import ExampleVisit, get_or_create_db_entry -@mock.patch("murfey.workflows.tomo.picking._transport_object") -@mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") -def test_picked_tomogram_not_run_class2d( - mock_ids, mock_transport, murfey_db_session: Session, tmp_path -): - """Run the picker feedback with less particles than needed for classification""" - mock_ids.return_value = [2, 1] - - # Insert table dependencies +def set_up_picking_db(murfey_db_session): + # Insert common elements needed in all picking tests dcg_entry: DataCollectionGroup = get_or_create_db_entry( murfey_db_session, DataCollectionGroup, @@ -42,7 +36,7 @@ def test_picked_tomogram_not_run_class2d( "dcg_id": dcg_entry.id, }, ) - get_or_create_db_entry( + processing_job_entry: ProcessingJob = get_or_create_db_entry( murfey_db_session, ProcessingJob, lookup_kwargs={ @@ -51,6 +45,41 @@ def test_picked_tomogram_not_run_class2d( "dc_id": dc_entry.id, }, ) + get_or_create_db_entry( + murfey_db_session, + AutoProcProgram, + lookup_kwargs={ + "id": 0, + "pj_id": processing_job_entry.id, + }, + ) + get_or_create_db_entry( + murfey_db_session, + ClassificationFeedbackParameters, + lookup_kwargs={ + "pj_id": processing_job_entry.id, + "estimate_particle_diameter": True, + "hold_class2d": False, + "hold_class3d": False, + "class_selection_score": 0, + "star_combination_job": 0, + "initial_model": "", + "next_job": 0, + }, + ) + return dcg_entry.id, dc_entry.id, processing_job_entry.id + + +@mock.patch("murfey.workflows.tomo.picking._transport_object") +@mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") +def test_picked_tomogram_not_run_class2d( + mock_ids, mock_transport, murfey_db_session: Session, tmp_path +): + """Run the picker feedback with less particles than needed for classification""" + mock_ids.return_value = [2, 1] + + # Insert table dependencies + set_up_picking_db(murfey_db_session) message = { "program_id": 0, @@ -61,7 +90,6 @@ def test_picked_tomogram_not_run_class2d( "register": "picked_tomogram", "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/sample.mrc", } - picking._register_picked_tomogram_use_diameter(message, murfey_db_session) mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) @@ -86,53 +114,19 @@ def test_picked_tomogram_not_run_class2d( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") -def test_picked_tomogram_run_class2d( +def test_picked_tomogram_run_class2d_with_diameter( mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): - """Run the picker feedback with less particles than needed for classification""" + """Run the picker feedback with a pre-determined particle diameter""" mock_transport.feedback_queue = "murfey_feedback" # Insert table dependencies - dcg_entry: DataCollectionGroup = get_or_create_db_entry( - murfey_db_session, - DataCollectionGroup, - lookup_kwargs={ - "id": 0, - "session_id": ExampleVisit.murfey_session_id, - "tag": "test_dcg", - }, - ) - dc_entry: DataCollection = get_or_create_db_entry( - murfey_db_session, - DataCollection, - lookup_kwargs={ - "id": 0, - "tag": "test_dc", - "dcg_id": dcg_entry.id, - }, - ) - processing_job_entry: ProcessingJob = get_or_create_db_entry( - murfey_db_session, - ProcessingJob, - lookup_kwargs={ - "id": 1, - "recipe": "test_recipe", - "dc_id": dc_entry.id, - }, - ) - get_or_create_db_entry( - murfey_db_session, - AutoProcProgram, - lookup_kwargs={ - "id": 0, - "pj_id": processing_job_entry.id, - }, - ) + dcg_id, dc_id, pj_id = set_up_picking_db(murfey_db_session) get_or_create_db_entry( murfey_db_session, TomographyProcessingParameters, lookup_kwargs={ - "dcg_id": dcg_entry.id, + "dcg_id": dcg_id, "pixel_size": 1.34, "dose_per_frame": 1, "frame_count": 5, @@ -147,12 +141,12 @@ def test_picked_tomogram_run_class2d( ParticleSizes, lookup_kwargs={ "id": particle, - "pj_id": processing_job_entry.id, + "pj_id": pj_id, "particle_size": 100, }, ) - mock_ids.return_value = [dcg_entry.id, 1] + mock_ids.return_value = [dcg_id, 1] message = { "session_id": 1, @@ -164,17 +158,6 @@ def test_picked_tomogram_run_class2d( "register": "picked_tomogram", "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/sample.mrc", } - - # Create a data collection group for lookups - grid_square = DataCollectionGroup( - id=1, - session_id=ExampleVisit.murfey_session_id, - tag="session_tag", - atlas_id=90, - ) - murfey_db_session.add(grid_square) - murfey_db_session.commit() - picking._register_picked_tomogram_use_diameter(message, murfey_db_session) mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) @@ -204,6 +187,121 @@ def test_picked_tomogram_run_class2d( "picker_id": None, "class2d_grp_uuid": 6, "class_uuids": {str(i): i for i in range(1, 6)}, + "next_job": 9, + "feedback_queue": "murfey_feedback", + }, + "recipes": ["em-tomo-class2d"], + }, + new_connection=True, + ) + + +@mock.patch("murfey.workflows.tomo.picking._transport_object") +@mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") +def test_picked_tomogram_run_class2d_estimate_diameter( + mock_ids, mock_transport, murfey_db_session: Session, tmp_path +): + """Run the picker feedback for Class2D, including diameter estimation""" + mock_transport.feedback_queue = "murfey_feedback" + + # Insert table dependencies + dcg_id, dc_id, pj_id = set_up_picking_db(murfey_db_session) + get_or_create_db_entry( + murfey_db_session, + TomographyProcessingParameters, + lookup_kwargs={ + "dcg_id": dcg_id, + "pixel_size": 1.34, + "dose_per_frame": 1, + "frame_count": 5, + "tilt_axis": 0, + "voltage": 300, + "particle_diameter": None, + }, + ) + for particle in range(10001): + get_or_create_db_entry( + murfey_db_session, + ParticleSizes, + lookup_kwargs={ + "id": particle, + "pj_id": pj_id, + "particle_size": 100, + }, + ) + # Insert one existing tomogram which should get flushed out + get_or_create_db_entry( + murfey_db_session, + TomogramPicks, + lookup_kwargs={ + "pj_id": pj_id, + "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/tomogram1.mrc", + "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/tomogram1_picks.cbox", + "particle_count": 10, + "tomogram_pixel_size": 5.3, + }, + ) + + mock_ids.return_value = [dcg_id, 1] + + message = { + "session_id": 1, + "program_id": 0, + "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/sample.cbox", + "particle_count": 2, + "particle_diameters": [10.1, 20.2], + "pixel_size": 5.3, + "register": "picked_tomogram", + "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/sample.mrc", + } + picking._register_picked_tomogram_use_diameter(message, murfey_db_session) + + mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + + # Two mock calls - one flushed tomogram and one new + assert mock_transport.send.call_count == 2 + mock_transport.send.assert_any_call( + "processing_recipe", + { + "parameters": { + "tomogram": f"{tmp_path}/Tomograms/job006/tomograms/tomogram1.mrc", + "cbox_3d": f"{tmp_path}/AutoPick/job007/CBOX_3d/tomogram1_picks.cbox", + "pixel_size": 5.3, + "particle_diameter": 100.0, + "kv": 300, + "node_creator_queue": "node_creator", + "session_id": message["session_id"], + "autoproc_program_id": 0, + "batch_size": 10000, + "nr_classes": 5, + "picker_id": None, + "class2d_grp_uuid": 6, + "class_uuids": {str(i): i for i in range(1, 6)}, + "next_job": 9, + "feedback_queue": "murfey_feedback", + }, + "recipes": ["em-tomo-class2d"], + }, + new_connection=True, + ) + mock_transport.send.assert_any_call( + "processing_recipe", + { + "parameters": { + "tomogram": message["tomogram"], + "cbox_3d": message["cbox_3d"], + "pixel_size": message["pixel_size"], + "particle_diameter": 100.0, + "kv": 300, + "node_creator_queue": "node_creator", + "session_id": message["session_id"], + "autoproc_program_id": 0, + "batch_size": 10000, + "nr_classes": 5, + "picker_id": None, + "class2d_grp_uuid": 12, + "class_uuids": {str(i): i for i in range(7, 12)}, + "next_job": 11, "feedback_queue": "murfey_feedback", }, "recipes": ["em-tomo-class2d"], From 27c50c79751a920456080e736a77bd3943d57826 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 10:38:45 +0100 Subject: [PATCH 20/31] Test pjids --- tests/workflows/tomo/test_tomo_picking.py | 40 ++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index e5bf09db..b5a3c763 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -16,7 +16,7 @@ from tests.conftest import ExampleVisit, get_or_create_db_entry -def set_up_picking_db(murfey_db_session): +def set_up_picking_db(murfey_db_session: Session): # Insert common elements needed in all picking tests dcg_entry: DataCollectionGroup = get_or_create_db_entry( murfey_db_session, @@ -70,6 +70,44 @@ def set_up_picking_db(murfey_db_session): return dcg_entry.id, dc_entry.id, processing_job_entry.id +def test_ids_tomo_classification(murfey_db_session: Session): + dcg_id, first_dc, first_pj = set_up_picking_db(murfey_db_session) + + # Insert a second data collection, processing job and autoproc program + second_dc: DataCollection = get_or_create_db_entry( + murfey_db_session, + DataCollection, + lookup_kwargs={ + "id": 1, + "tag": "second_dc", + "dcg_id": dcg_id, + }, + ) + second_pj: ProcessingJob = get_or_create_db_entry( + murfey_db_session, + ProcessingJob, + lookup_kwargs={ + "id": 10, + "recipe": "second_recipe", + "dc_id": second_dc.id, + }, + ) + get_or_create_db_entry( + murfey_db_session, + AutoProcProgram, + lookup_kwargs={ + "id": 11, + "pj_id": second_pj.id, + }, + ) + + returned_ids = picking._ids_tomo_classification( + 11, "test_recipe", murfey_db_session + ) + assert returned_ids[0] == dcg_id + assert returned_ids[1] == first_pj.id + + @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") def test_picked_tomogram_not_run_class2d( From fad7f2a175ff7be8edfd1b88ef84f105ebdb3a87 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 10:46:32 +0100 Subject: [PATCH 21/31] Fix error in test --- tests/workflows/tomo/test_tomo_picking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index b5a3c763..b234692b 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -71,7 +71,7 @@ def set_up_picking_db(murfey_db_session: Session): def test_ids_tomo_classification(murfey_db_session: Session): - dcg_id, first_dc, first_pj = set_up_picking_db(murfey_db_session) + dcg_id, first_dc_id, first_pj_id = set_up_picking_db(murfey_db_session) # Insert a second data collection, processing job and autoproc program second_dc: DataCollection = get_or_create_db_entry( @@ -105,7 +105,7 @@ def test_ids_tomo_classification(murfey_db_session: Session): 11, "test_recipe", murfey_db_session ) assert returned_ids[0] == dcg_id - assert returned_ids[1] == first_pj.id + assert returned_ids[1] == first_pj_id @mock.patch("murfey.workflows.tomo.picking._transport_object") From 2a902a183db4cd3abe01ff27752fbb8054468813 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 10:54:56 +0100 Subject: [PATCH 22/31] Change primary key in tomogram picks --- src/murfey/util/db.py | 3 +-- src/murfey/workflows/tomo/picking.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/murfey/util/db.py b/src/murfey/util/db.py index ebdb3c5d..065df4c5 100644 --- a/src/murfey/util/db.py +++ b/src/murfey/util/db.py @@ -681,9 +681,8 @@ class CtfParameters(SQLModel, table=True): # type: ignore class TomogramPicks(SQLModel, table=True): # type: ignore - id: Optional[int] = Field(default=None, primary_key=True) + tomogram: str = Field(primary_key=True) pj_id: int = Field(foreign_key="processingjob.id") - tomogram: str cbox_3d: str particle_count: int tomogram_pixel_size: float diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index 3f229673..db8910d7 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -63,7 +63,6 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): ) _db.add(pick_params) _db.commit() - _db.close() picking_db_len = _db.exec( select(func.count(ParticleSizes.id)).where(ParticleSizes.pj_id == pj_id) @@ -150,6 +149,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "processing_recipe", zocalo_message, new_connection=True ) feedback_params.next_job += 2 + _db.delete(saved_message) else: # If the diameter is known then just send the new message particle_diameter = tomo_params.particle_diameter From f6e7d02e25954faff345bce7483c20b6cab27534 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 11:06:34 +0100 Subject: [PATCH 23/31] Skip expunging? --- src/murfey/workflows/tomo/picking.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index db8910d7..10699632 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -121,7 +121,6 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): ) } class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] - _db.expunge(saved_message) zocalo_message: dict = { "parameters": { "tomogram": saved_message.tomogram, From 17d4759fc2619c0a2baa85cd4ecddd858b38a209 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 11:12:50 +0100 Subject: [PATCH 24/31] Ids are bigger --- tests/workflows/tomo/test_tomo_picking.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index b234692b..f506864d 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -313,8 +313,8 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "batch_size": 10000, "nr_classes": 5, "picker_id": None, - "class2d_grp_uuid": 6, - "class_uuids": {str(i): i for i in range(1, 6)}, + "class2d_grp_uuid": 12, + "class_uuids": {str(i): i for i in range(7, 12)}, "next_job": 9, "feedback_queue": "murfey_feedback", }, @@ -337,8 +337,8 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "batch_size": 10000, "nr_classes": 5, "picker_id": None, - "class2d_grp_uuid": 12, - "class_uuids": {str(i): i for i in range(7, 12)}, + "class2d_grp_uuid": 18, + "class_uuids": {str(i): i for i in range(13, 18)}, "next_job": 11, "feedback_queue": "murfey_feedback", }, From 15df9a7e3c420f5cfc02c1c132fd7f0bfb113955 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 11:18:31 +0100 Subject: [PATCH 25/31] Ids are not that --- tests/workflows/tomo/test_tomo_picking.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index f506864d..093d1b14 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -298,6 +298,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( # Two mock calls - one flushed tomogram and one new assert mock_transport.send.call_count == 2 + print(mock_transport.send.mock_calls) mock_transport.send.assert_any_call( "processing_recipe", { @@ -314,7 +315,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "nr_classes": 5, "picker_id": None, "class2d_grp_uuid": 12, - "class_uuids": {str(i): i for i in range(7, 12)}, + "class_uuids": {str(i): i + 6 for i in range(1, 6)}, "next_job": 9, "feedback_queue": "murfey_feedback", }, @@ -338,7 +339,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "nr_classes": 5, "picker_id": None, "class2d_grp_uuid": 18, - "class_uuids": {str(i): i for i in range(13, 18)}, + "class_uuids": {str(i): i + 12 for i in range(1, 6)}, "next_job": 11, "feedback_queue": "murfey_feedback", }, From 569c670734100885c318a30cd2a9ae17b732a969 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 11:28:02 +0100 Subject: [PATCH 26/31] Try mocking out the count for faster tests --- tests/workflows/tomo/test_tomo_picking.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index 093d1b14..d3a3085b 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -152,11 +152,13 @@ def test_picked_tomogram_not_run_class2d( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") +@mock.patch("murfey.workflows.tomo.picking.sqlalchemy.func") def test_picked_tomogram_run_class2d_with_diameter( - mock_ids, mock_transport, murfey_db_session: Session, tmp_path + mock_func, mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with a pre-determined particle diameter""" mock_transport.feedback_queue = "murfey_feedback" + mock_func.count.return_value = 10001 # Insert table dependencies dcg_id, dc_id, pj_id = set_up_picking_db(murfey_db_session) @@ -173,7 +175,7 @@ def test_picked_tomogram_run_class2d_with_diameter( "particle_diameter": 200, }, ) - for particle in range(10001): + for particle in range(10): get_or_create_db_entry( murfey_db_session, ParticleSizes, @@ -199,6 +201,7 @@ def test_picked_tomogram_run_class2d_with_diameter( picking._register_picked_tomogram_use_diameter(message, murfey_db_session) mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + mock_func.count.assert_called_once() tomograms_db = murfey_db_session.exec( select(TomogramPicks).where(TomogramPicks.pj_id == 1) @@ -236,11 +239,13 @@ def test_picked_tomogram_run_class2d_with_diameter( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") +@mock.patch("murfey.workflows.tomo.picking.sqlalchemy.func") def test_picked_tomogram_run_class2d_estimate_diameter( - mock_ids, mock_transport, murfey_db_session: Session, tmp_path + mock_func, mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback for Class2D, including diameter estimation""" mock_transport.feedback_queue = "murfey_feedback" + mock_func.count.return_value = 10001 # Insert table dependencies dcg_id, dc_id, pj_id = set_up_picking_db(murfey_db_session) @@ -257,7 +262,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "particle_diameter": None, }, ) - for particle in range(10001): + for particle in range(10): get_or_create_db_entry( murfey_db_session, ParticleSizes, @@ -295,10 +300,10 @@ def test_picked_tomogram_run_class2d_estimate_diameter( picking._register_picked_tomogram_use_diameter(message, murfey_db_session) mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) + mock_func.count.assert_called_once() # Two mock calls - one flushed tomogram and one new assert mock_transport.send.call_count == 2 - print(mock_transport.send.mock_calls) mock_transport.send.assert_any_call( "processing_recipe", { From d6797715318ff5094a9d5f15206a9293b0c70bec Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 11:31:45 +0100 Subject: [PATCH 27/31] Mocked wrong thing --- tests/workflows/tomo/test_tomo_picking.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index d3a3085b..f623fc33 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -152,7 +152,7 @@ def test_picked_tomogram_not_run_class2d( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") -@mock.patch("murfey.workflows.tomo.picking.sqlalchemy.func") +@mock.patch("murfey.workflows.tomo.picking.func") def test_picked_tomogram_run_class2d_with_diameter( mock_func, mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): @@ -239,7 +239,7 @@ def test_picked_tomogram_run_class2d_with_diameter( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") -@mock.patch("murfey.workflows.tomo.picking.sqlalchemy.func") +@mock.patch("murfey.workflows.tomo.picking.func") def test_picked_tomogram_run_class2d_estimate_diameter( mock_func, mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): From 9b29882b2d9c13fb0095f5abca53e189e609d442 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 11:36:55 +0100 Subject: [PATCH 28/31] Mocking count didn't work --- tests/workflows/tomo/test_tomo_picking.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index f623fc33..a2748012 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -152,13 +152,11 @@ def test_picked_tomogram_not_run_class2d( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") -@mock.patch("murfey.workflows.tomo.picking.func") def test_picked_tomogram_run_class2d_with_diameter( - mock_func, mock_ids, mock_transport, murfey_db_session: Session, tmp_path + mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with a pre-determined particle diameter""" mock_transport.feedback_queue = "murfey_feedback" - mock_func.count.return_value = 10001 # Insert table dependencies dcg_id, dc_id, pj_id = set_up_picking_db(murfey_db_session) @@ -175,7 +173,7 @@ def test_picked_tomogram_run_class2d_with_diameter( "particle_diameter": 200, }, ) - for particle in range(10): + for particle in range(10001): get_or_create_db_entry( murfey_db_session, ParticleSizes, @@ -201,7 +199,6 @@ def test_picked_tomogram_run_class2d_with_diameter( picking._register_picked_tomogram_use_diameter(message, murfey_db_session) mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) - mock_func.count.assert_called_once() tomograms_db = murfey_db_session.exec( select(TomogramPicks).where(TomogramPicks.pj_id == 1) @@ -239,13 +236,11 @@ def test_picked_tomogram_run_class2d_with_diameter( @mock.patch("murfey.workflows.tomo.picking._transport_object") @mock.patch("murfey.workflows.tomo.picking._ids_tomo_classification") -@mock.patch("murfey.workflows.tomo.picking.func") def test_picked_tomogram_run_class2d_estimate_diameter( - mock_func, mock_ids, mock_transport, murfey_db_session: Session, tmp_path + mock_ids, mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback for Class2D, including diameter estimation""" mock_transport.feedback_queue = "murfey_feedback" - mock_func.count.return_value = 10001 # Insert table dependencies dcg_id, dc_id, pj_id = set_up_picking_db(murfey_db_session) @@ -262,7 +257,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "particle_diameter": None, }, ) - for particle in range(10): + for particle in range(10001): get_or_create_db_entry( murfey_db_session, ParticleSizes, @@ -300,7 +295,6 @@ def test_picked_tomogram_run_class2d_estimate_diameter( picking._register_picked_tomogram_use_diameter(message, murfey_db_session) mock_ids.assert_called_once_with(0, "em-tomo-class2d", murfey_db_session) - mock_func.count.assert_called_once() # Two mock calls - one flushed tomogram and one new assert mock_transport.send.call_count == 2 From 8cb58303fd1fc1f6eb7e83f55b31810e624212fa Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Mon, 20 Oct 2025 12:50:42 +0100 Subject: [PATCH 29/31] Name things consistently --- src/murfey/workflows/tomo/picking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index 10699632..414283fc 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -203,6 +203,6 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): _db.close() -def particles_tomogram(message: dict, murfey_db: Session) -> bool: +def picked_tomogram(message: dict, murfey_db: Session) -> bool: _register_picked_tomogram_use_diameter(message, murfey_db) return True From 47463441a986dec3e9b57204beff3b10cf6a3219 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 28 Oct 2025 10:51:23 +0000 Subject: [PATCH 30/31] Rename _db to murfey_db --- src/murfey/workflows/tomo/picking.py | 60 ++++++++++++++-------------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/src/murfey/workflows/tomo/picking.py b/src/murfey/workflows/tomo/picking.py index 414283fc..9d1fafab 100644 --- a/src/murfey/workflows/tomo/picking.py +++ b/src/murfey/workflows/tomo/picking.py @@ -23,9 +23,11 @@ logger = getLogger("murfey.workflows.tomo.feedback") -def _ids_tomo_classification(app_id: int, recipe: str, _db) -> Tuple[int, int]: +def _ids_tomo_classification( + app_id: int, recipe: str, murfey_db: Session +) -> Tuple[int, int]: dcg_id = ( - _db.exec( + murfey_db.exec( select(AutoProcProgram, ProcessingJob, DataCollection) .where(AutoProcProgram.id == app_id) .where(AutoProcProgram.pj_id == ProcessingJob.id) @@ -35,7 +37,7 @@ def _ids_tomo_classification(app_id: int, recipe: str, _db) -> Tuple[int, int]: .dcg_id ) pj_id = ( - _db.exec( + murfey_db.exec( select(ProcessingJob, DataCollection) .where(DataCollection.dcg_id == dcg_id) .where(ProcessingJob.dc_id == DataCollection.id) @@ -47,11 +49,11 @@ def _ids_tomo_classification(app_id: int, recipe: str, _db) -> Tuple[int, int]: return dcg_id, pj_id -def _register_picked_tomogram_use_diameter(message: dict, _db: Session): +def _register_picked_tomogram_use_diameter(message: dict, murfey_db: Session): """Received picked particles from the tomogram autopick service""" # Add this message to the table of seen messages dcg_id, pj_id = _ids_tomo_classification( - message["program_id"], "em-tomo-class2d", _db + message["program_id"], "em-tomo-class2d", murfey_db ) pick_params = TomogramPicks( @@ -61,16 +63,16 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): particle_count=message["particle_count"], tomogram_pixel_size=message["pixel_size"], ) - _db.add(pick_params) - _db.commit() + murfey_db.add(pick_params) + murfey_db.commit() - picking_db_len = _db.exec( + picking_db_len = murfey_db.exec( select(func.count(ParticleSizes.id)).where(ParticleSizes.pj_id == pj_id) ).one() if picking_db_len > default_tomo_parameters.batch_size_2d: # If there are enough particles to get a diameter instrument_name = ( - _db.exec( + murfey_db.exec( select(MurfeySession).where(MurfeySession.id == message["session_id"]) ) .one() @@ -79,7 +81,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] - tomo_params = _db.exec( + tomo_params = murfey_db.exec( select(TomographyProcessingParameters).where( TomographyProcessingParameters.dcg_id == dcg_id ) @@ -87,7 +89,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): particle_diameter = tomo_params.particle_diameter - feedback_params = _db.exec( + feedback_params = murfey_db.exec( select(ClassificationFeedbackParameters).where( ClassificationFeedbackParameters.pj_id == pj_id ) @@ -97,15 +99,15 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): if not particle_diameter: # If the diameter has not been calculated then find it - picking_db = _db.exec( + picking_db = murfey_db.exec( select(ParticleSizes.particle_size).where(ParticleSizes.pj_id == pj_id) ).all() particle_diameter = np.quantile(list(picking_db), 0.75) tomo_params.particle_diameter = particle_diameter - _db.add(tomo_params) - _db.commit() + murfey_db.add(tomo_params) + murfey_db.commit() - tomo_pick_db = _db.exec( + tomo_pick_db = murfey_db.exec( select(TomogramPicks).where(TomogramPicks.pj_id == pj_id) ).all() for saved_message in tomo_pick_db: @@ -114,13 +116,13 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): str(i + 1): m for i, m in enumerate( _murfey_id( - _app_id(pj_id, _db), - _db, + _app_id(pj_id, murfey_db), + murfey_db, number=default_tomo_parameters.nr_classes_2d, ) ) } - class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] + class2d_grp_uuid = _murfey_id(_app_id(pj_id, murfey_db), murfey_db)[0] zocalo_message: dict = { "parameters": { "tomogram": saved_message.tomogram, @@ -130,7 +132,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "kv": tomo_params.voltage, "node_creator_queue": machine_config.node_creator_queue, "session_id": message["session_id"], - "autoproc_program_id": _app_id(pj_id, _db), + "autoproc_program_id": _app_id(pj_id, murfey_db), "batch_size": default_tomo_parameters.batch_size_2d, "nr_classes": default_tomo_parameters.nr_classes_2d, "picker_id": None, @@ -148,7 +150,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "processing_recipe", zocalo_message, new_connection=True ) feedback_params.next_job += 2 - _db.delete(saved_message) + murfey_db.delete(saved_message) else: # If the diameter is known then just send the new message particle_diameter = tomo_params.particle_diameter @@ -156,13 +158,13 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): str(i + 1): m for i, m in enumerate( _murfey_id( - _app_id(pj_id, _db), - _db, + _app_id(pj_id, murfey_db), + murfey_db, number=default_tomo_parameters.nr_classes_2d, ) ) } - class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0] + class2d_grp_uuid = _murfey_id(_app_id(pj_id, murfey_db), murfey_db)[0] zocalo_message = { "parameters": { "tomogram": message["tomogram"], @@ -172,7 +174,7 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "kv": tomo_params.voltage, "node_creator_queue": machine_config.node_creator_queue, "session_id": message["session_id"], - "autoproc_program_id": _app_id(pj_id, _db), + "autoproc_program_id": _app_id(pj_id, murfey_db), "batch_size": default_tomo_parameters.batch_size_2d, "nr_classes": default_tomo_parameters.nr_classes_2d, "picker_id": None, @@ -190,17 +192,17 @@ def _register_picked_tomogram_use_diameter(message: dict, _db: Session): "processing_recipe", zocalo_message, new_connection=True ) feedback_params.next_job += 2 - _db.add(feedback_params) - _db.commit() + murfey_db.add(feedback_params) + murfey_db.commit() else: # If not enough particles then save the new sizes particle_list = message.get("particle_diameters") assert isinstance(particle_list, list) for particle in particle_list: new_particle = ParticleSizes(pj_id=pj_id, particle_size=particle) - _db.add(new_particle) - _db.commit() - _db.close() + murfey_db.add(new_particle) + murfey_db.commit() + murfey_db.close() def picked_tomogram(message: dict, murfey_db: Session) -> bool: From 8a9f8f9b0adf2a6fc99f83a63e770590c6c078f4 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 28 Oct 2025 10:55:46 +0000 Subject: [PATCH 31/31] Turn down the particle counts --- src/murfey/util/processing_params.py | 2 +- tests/workflows/tomo/test_tomo_picking.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/murfey/util/processing_params.py b/src/murfey/util/processing_params.py index 07c5a12c..cdb23991 100644 --- a/src/murfey/util/processing_params.py +++ b/src/murfey/util/processing_params.py @@ -82,7 +82,7 @@ class SPAParameters(BaseModel): class TomographyParameters(BaseModel): - batch_size_2d: int = 10000 + batch_size_2d: int = 5000 nr_classes_2d: int = 5 diff --git a/tests/workflows/tomo/test_tomo_picking.py b/tests/workflows/tomo/test_tomo_picking.py index a2748012..ce7cb582 100644 --- a/tests/workflows/tomo/test_tomo_picking.py +++ b/tests/workflows/tomo/test_tomo_picking.py @@ -173,7 +173,7 @@ def test_picked_tomogram_run_class2d_with_diameter( "particle_diameter": 200, }, ) - for particle in range(10001): + for particle in range(5001): get_or_create_db_entry( murfey_db_session, ParticleSizes, @@ -220,7 +220,7 @@ def test_picked_tomogram_run_class2d_with_diameter( "node_creator_queue": "node_creator", "session_id": message["session_id"], "autoproc_program_id": 0, - "batch_size": 10000, + "batch_size": 5000, "nr_classes": 5, "picker_id": None, "class2d_grp_uuid": 6, @@ -257,7 +257,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "particle_diameter": None, }, ) - for particle in range(10001): + for particle in range(5001): get_or_create_db_entry( murfey_db_session, ParticleSizes, @@ -310,7 +310,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "node_creator_queue": "node_creator", "session_id": message["session_id"], "autoproc_program_id": 0, - "batch_size": 10000, + "batch_size": 5000, "nr_classes": 5, "picker_id": None, "class2d_grp_uuid": 12, @@ -334,7 +334,7 @@ def test_picked_tomogram_run_class2d_estimate_diameter( "node_creator_queue": "node_creator", "session_id": message["session_id"], "autoproc_program_id": 0, - "batch_size": 10000, + "batch_size": 5000, "nr_classes": 5, "picker_id": None, "class2d_grp_uuid": 18,