Source code for ska_ser_skallop.mvp_control.subarray.job_scheduling

from concurrent.futures import ThreadPoolExecutor
from typing import Any


[docs]class Job: def __init__(self, func, *args, **kwars) -> None: self._func = func self._task = None self._args = args self._kwargs = kwars self._pool = ThreadPoolExecutor(max_workers=1, thread_name_prefix="job")
[docs] def schedule_for_execution(self) -> None: self._task = self._pool.submit(self._func, *self._args, **self._kwargs)
[docs] def wait_until_job_is_finished(self, timeout=5) -> Any: result = None if self._task: result = self._task.result(timeout) return result