diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d6f4bf45..193b3de9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,10 +3,11 @@ name: Build and test on: [push, pull_request] env: - ISPYB_DATABASE_SCHEMA: 4.8.0 + ISPYB_DATABASE_SCHEMA: 4.11.0 # Installs from GitHub # Versions: https://github.com/DiamondLightSource/ispyb-database/tags # Previous version(s): + # 4.8.0 # 4.2.1 # released 2024-08-19 # 4.1.0 # released 2024-03-26 diff --git a/src/murfey/server/api/session_shared.py b/src/murfey/server/api/session_shared.py index 644b9b35..3a9955fa 100644 --- a/src/murfey/server/api/session_shared.py +++ b/src/murfey/server/api/session_shared.py @@ -1,4 +1,5 @@ import logging +import os from pathlib import Path from typing import Dict, List @@ -136,11 +137,48 @@ def get_foil_hole(session_id: int, fh_name: int, db) -> Dict[str, int]: return {f[1].tag: f[0].id for f in foil_holes} -def find_upstream_visits(session_id: int, db: SQLModelSession): +def find_upstream_visits(session_id: int, db: SQLModelSession, max_depth: int = 2): """ Returns a nested dictionary, in which visits and the full paths to their directories are further grouped by instrument name. """ + + def _recursive_search( + dirpath: str | Path, + search_string: str, + partial_match: bool = True, + max_depth: int = 1, + result: dict[str, Path] | None = None, + ): + # If no dictionary was passed in, create a new dictionary + if result is None: + result = {} + # Stop recursing for this route once max depth hits 0 + if max_depth == 0: + return result + + # Walk through the directories + for entry in os.scandir(dirpath): + if entry.is_dir(): + # Update dictionary with match and stop recursing for this route + if ( + search_string in entry.name + if partial_match + else search_string == entry.name + ): + if result is not None: # MyPy needs this 'is not None' check + result[entry.name] = Path(entry.path) + else: + # Continue searching down this route until max depth is reached + result = _recursive_search( + dirpath=entry.path, + search_string=search_string, + partial_match=partial_match, + max_depth=max_depth - 1, + result=result, + ) + return result + murfey_session = db.exec( select(MurfeySession).where(MurfeySession.id == session_id) ).one() @@ -155,12 +193,13 @@ def find_upstream_visits(session_id: int, db: SQLModelSession): upstream_instrument, upstream_data_dir, ) in machine_config.upstream_data_directories.items(): - # Looks for visit name in file path - current_upstream_visits = {} - for visit_path in Path(upstream_data_dir).glob(f"{visit_name.split('-')[0]}-*"): - if visit_path.is_dir(): - current_upstream_visits[visit_path.name] = visit_path - upstream_visits[upstream_instrument] = current_upstream_visits + # Recursively look for matching visit names under current directory + upstream_visits[upstream_instrument] = _recursive_search( + dirpath=upstream_data_dir, + search_string=f"{visit_name.split('-')[0]}-", + partial_match=True, + max_depth=max_depth, + ) return upstream_visits diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 45c9e634..c95f1e1b 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -2160,7 +2160,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None: murfey_db=_db, ) if murfey.server._transport_object: - if result.get("success", False): + if result.get("success"): murfey.server._transport_object.transport.ack(header) else: # Send it directly to DLQ without trying to rerun it diff --git a/src/murfey/workflows/register_data_collection.py b/src/murfey/workflows/register_data_collection.py index dbf1f53c..bedaf63d 100644 --- a/src/murfey/workflows/register_data_collection.py +++ b/src/murfey/workflows/register_data_collection.py @@ -1,4 +1,5 @@ import logging +import time import ispyb.sqlalchemy._auto_db_schema as ISPyBDB from sqlmodel import select @@ -37,6 +38,7 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: dcgid = dcg[0].id # flush_data_collections(message["source"], murfey_db) else: + time.sleep(2) logger.warning( "No data collection group ID was found for image directory " f"{sanitise(message['image_directory'])} and source " @@ -82,6 +84,14 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: else "" ), ).get("return_value", None) + if dcid is None: + time.sleep(2) + logger.error( + "Failed to register the following data collection: \n" + f"{message} \n" + "Requeueing message" + ) + return {"success": False, "requeue": True} murfey_dc = MurfeyDB.DataCollection( id=dcid, tag=message.get("tag"), @@ -89,14 +99,5 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: ) murfey_db.add(murfey_dc) murfey_db.commit() - dcid = murfey_dc.id murfey_db.close() - - if dcid is None: - logger.error( - "Failed to register the following data collection: \n" - f"{message} \n" - "Requeueing message" - ) - return {"success": False, "requeue": True} return {"success": True} diff --git a/src/murfey/workflows/register_data_collection_group.py b/src/murfey/workflows/register_data_collection_group.py index d887bfd4..a225936f 100644 --- a/src/murfey/workflows/register_data_collection_group.py +++ b/src/murfey/workflows/register_data_collection_group.py @@ -52,6 +52,15 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: "return_value", None ) + if dcgid is None: + time.sleep(2) + logger.error( + "Failed to register the following data collection group: \n" + f"{message} \n" + "Requeuing message" + ) + return {"success": False, "requeue": True} + atlas_record = ISPyBDB.Atlas( dataCollectionGroupId=dcgid, atlasImage=message.get("atlas", ""), @@ -75,15 +84,6 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: murfey_db.commit() murfey_db.close() - if dcgid is None: - time.sleep(2) - logger.error( - "Failed to register the following data collection group: \n" - f"{message} \n" - "Requeuing message" - ) - return {"success": False, "requeue": True} - if dcg_hooks := entry_points(group="murfey.hooks", name="data_collection_group"): try: for hook in dcg_hooks: diff --git a/src/murfey/workflows/register_processing_job.py b/src/murfey/workflows/register_processing_job.py index 4378453c..1bb2d5f5 100644 --- a/src/murfey/workflows/register_processing_job.py +++ b/src/murfey/workflows/register_processing_job.py @@ -63,6 +63,8 @@ def run(message: dict, murfey_db: SQLModelSession): pid = _transport_object.do_create_ispyb_job(record).get( "return_value", None ) + if pid is None: + return {"success": False, "requeue": True} murfey_pj = MurfeyDB.ProcessingJob( id=pid, recipe=message["recipe"], dc_id=_dcid ) @@ -71,9 +73,6 @@ def run(message: dict, murfey_db: SQLModelSession): pid = murfey_pj.id murfey_db.close() - if pid is None: - return {"success": False, "requeue": True} - # Update Prometheus counter for preprocessed movies prom.preprocessed_movies.labels(processing_job=pid) diff --git a/tests/server/api/test_session_shared.py b/tests/server/api/test_session_shared.py index c0f4c109..67db321d 100644 --- a/tests/server/api/test_session_shared.py +++ b/tests/server/api/test_session_shared.py @@ -9,10 +9,11 @@ from tests.conftest import ExampleVisit +@pytest.mark.parametrize("recurse", (True, False)) def test_find_upstream_visits( mocker: MockerFixture, tmp_path: Path, - # murfey_db_session, + recurse: bool, ): # Get the visit, instrument name, and session ID visit_name_root = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}" @@ -40,7 +41,10 @@ def test_find_upstream_visits( # Only directories should be picked up upstream_visit.mkdir(parents=True, exist_ok=True) upstream_visits[upstream_instrument] = {upstream_visit.stem: upstream_visit} - upstream_data_dirs[upstream_instrument] = upstream_visit.parent + # Check that the function can cope with recursive searching + upstream_data_dirs[upstream_instrument] = ( + upstream_visit.parent.parent if recurse else upstream_visit.parent + ) else: upstream_visit.parent.mkdir(parents=True, exist_ok=True) upstream_visit.touch(exist_ok=True) diff --git a/tests/workflows/test_register_data_collection.py b/tests/workflows/test_register_data_collection.py index 49be9586..88aec006 100644 --- a/tests/workflows/test_register_data_collection.py +++ b/tests/workflows/test_register_data_collection.py @@ -99,6 +99,6 @@ def test_run( assert result == {"success": False, "requeue": True} else: mock_transport_object.do_insert_data_collection.assert_not_called() - assert result == {"success": False, "requeue": True} + assert result == {"success": True} else: assert result == {"success": True} diff --git a/tests/workflows/test_register_data_collection_group.py b/tests/workflows/test_register_data_collection_group.py index 7447dc22..3324dec9 100644 --- a/tests/workflows/test_register_data_collection_group.py +++ b/tests/workflows/test_register_data_collection_group.py @@ -77,10 +77,10 @@ def test_run( else: if ispyb_session_id is not None: mock_transport_object.do_insert_data_collection_group.assert_called_once() - mock_transport_object.do_insert_atlas.assert_called_once() if insert_dcg is not None: + mock_transport_object.do_insert_atlas.assert_called_once() assert result == {"success": True} else: assert result == {"success": False, "requeue": True} else: - assert result == {"success": False, "requeue": True} + assert result == {"success": True} diff --git a/tests/workflows/test_processing_job.py b/tests/workflows/test_register_processing_job.py similarity index 98% rename from tests/workflows/test_processing_job.py rename to tests/workflows/test_register_processing_job.py index 85562d93..350989a9 100644 --- a/tests/workflows/test_processing_job.py +++ b/tests/workflows/test_register_processing_job.py @@ -104,6 +104,6 @@ def test_run( else: assert result == {"success": False, "requeue": True} else: - assert result == {"success": False, "requeue": True} + assert result == {"success": True} else: assert result == {"success": False, "requeue": True}