Skip to content

Script for Processing Datasets

You can use the following script to easily interact with the QC service and process entire WSI datasets at once:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import asyncio
from pathlib import Path
from typing import Any

from aiohttp import ClientSession, ClientTimeout


REQUEST_LIMIT = 4
REQUEST_TIMEOUT = 30 * 60  # 30 minutes

MAX_REQUEST_RETRY_ATTEMPTS = 5
BACKOFF_BASE = 2  # 2 seconds

URL = "http://rayservice-qc-serve-svc.rationai-jobs-ns.svc.cluster.local:8000/"

OUTPUT_DIR = "/mnt/data/trash-bin/"  # Change to your desired output directory
REPORT_SAVE_PATH = "/mnt/data/trash-bin/report.html" # Change to your desired report save path and name

SLIDES: list[Path] = list(  # Change to list of your slides
    Path("/mnt/data/Projects/FNBrno/2k_prostates_2024_tiff_packed/").glob("*.tiff")
)[:10]


semaphore = asyncio.Semaphore(REQUEST_LIMIT)


async def put_request(
    session: ClientSession, url: str, data: dict[str, Any]
) -> tuple[int, str]:
    timeout = ClientTimeout(total=REQUEST_TIMEOUT)

    try:
        async with semaphore, session.put(url, json=data, timeout=timeout) as response:
            text = await response.text()

            return response.status, text
    except TimeoutError:
        print(
            f"Failed to process {data['wsi_path']}:\n\tTimeout after {REQUEST_TIMEOUT} seconds\n"
        )

        return -1, "Timeout"


async def repeatable_put_request(
    session: ClientSession, url: str, data: dict[str, Any], num_repeats: int
) -> None:
    for attempt in range(1, num_repeats + 1):
        status, text = await put_request(session, url, data)

        if status == -1 and text == "Timeout":
            return

        if status == 500 and text == "Internal Server Error":
            att_count = f"attempt {attempt}/{MAX_REQUEST_RETRY_ATTEMPTS}"
            print(
                f"Unexpected status 500 received for {data['wsi_path']} ({att_count}):\n\tResponse: {text}\n"
            )
            await asyncio.sleep(BACKOFF_BASE**attempt)

            continue

        print(
            f"Processed {data['wsi_path']}:\n\tStatus: {status} \n\tResponse: {text}\n"
        )

        return

    print(f"Failed to process {data['wsi_path']}:\n\tAll retry attempts failed\n")


async def generate_report(
    session: ClientSession, slides: list[Path], output_dir: str, save_location: str
) -> None:
    url = URL + "report"

    data = {
        "backgrounds": [str(slide.resolve()) for slide in slides],
        "mask_dir": output_dir,
        "save_location": save_location,
        "compute_metrics": True,
    }

    async with semaphore, session.put(url, json=data) as response:
        result = await response.text()

        print(
            f"Report generation:\n\tStatus: {response.status} \n\tResponse: {result}\n"
        )


async def main() -> None:
    async with ClientSession() as session:
        tasks = [
            repeatable_put_request(
                session=session,
                url=URL,
                data={
                    "wsi_path": str(slide.resolve()),
                    "output_path": OUTPUT_DIR,
                    "mask_level": 3,
                    "sample_level": 1,
                    "check_residual": True,
                    "check_folding": True,
                    "check_focus": True,
                    "wb_correction": True,
                },
                num_repeats=MAX_REQUEST_RETRY_ATTEMPTS,
            )
            for slide in SLIDES
        ]

        # Processing of the slides
        await asyncio.gather(*tasks)

        # Report generation
        await generate_report(
            session=session,
            slides=SLIDES,
            output_dir=OUTPUT_DIR,
            save_location=REPORT_SAVE_PATH,
        )


if __name__ == "__main__":
    asyncio.run(main())

Info

Please keep the REQUEST_LIMIT at 4 or fewer to ensure that multiple users can submit requests concurrently without overwhelming the service.

Tip

The REQUEST_TIMEOUT constant can be set to None to completly disable the timeout. However, this is not recommeded, as it may occasionally result in requests that are stuck forever. If your data require longer processing time, we recommend increasing the timeout.