Skip to content

tasks

numerous.tasks

Task management API.

TaskController

Bases: Protocol

Controller interface injected into task functions.

Source code in numerous/tasks/controller.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class TaskController(Protocol):
    """Controller interface injected into task functions."""

    def should_stop(self) -> bool:
        """Check if the task should stop."""
        ...

    def request_stop(self) -> None:
        """Request the task to stop."""
        ...

    def set_progress(self, progress: float) -> None:
        """Set the task progress (0.0 to 1.0)."""
        ...

    def set_status(self, status: Union[str, TaskStatus]) -> None:
        """Set the task status."""
        ...

    def set_output(self, output: dict[str, Any]) -> None:
        """Set the task output."""
        ...

request_stop()

Request the task to stop.

Source code in numerous/tasks/controller.py
22
23
24
def request_stop(self) -> None:
    """Request the task to stop."""
    ...

set_output(output)

Set the task output.

Source code in numerous/tasks/controller.py
34
35
36
def set_output(self, output: dict[str, Any]) -> None:
    """Set the task output."""
    ...

set_progress(progress)

Set the task progress (0.0 to 1.0).

Source code in numerous/tasks/controller.py
26
27
28
def set_progress(self, progress: float) -> None:
    """Set the task progress (0.0 to 1.0)."""
    ...

set_status(status)

Set the task status.

Source code in numerous/tasks/controller.py
30
31
32
def set_status(self, status: Union[str, TaskStatus]) -> None:
    """Set the task status."""
    ...

should_stop()

Check if the task should stop.

Source code in numerous/tasks/controller.py
18
19
20
def should_stop(self) -> bool:
    """Check if the task should stop."""
    ...

TaskDefinition dataclass

Immutable task definition.

Source code in numerous/tasks/types.py
43
44
45
46
47
48
49
50
51
52
@dataclass(frozen=True)
class TaskDefinition:
    """Immutable task definition."""

    id: str
    name: str
    func: Callable[..., Any]
    app_id: Optional[str] = None
    app_version_id: Optional[str] = None
    command: Optional[list[str]] = None

TaskInstanceNotFoundError

Bases: Exception

Raised when a task instance cannot be found on the platform.

Source code in numerous/tasks/types.py
18
19
20
21
22
23
class TaskInstanceNotFoundError(Exception):
    """Raised when a task instance cannot be found on the platform."""

    def __init__(self, instance_id: str) -> None:
        self.instance_id = instance_id
        super().__init__(f"Task instance not found: {instance_id}")

TaskInstanceState dataclass

Mutable task instance state.

This is the only mutable structure, containing runtime state. All state changes go through pure functions.

Source code in numerous/tasks/types.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@dataclass
class TaskInstanceState:
    """
    Mutable task instance state.

    This is the only mutable structure, containing runtime state.
    All state changes go through pure functions.
    """

    id: str
    task_id: str
    status: TaskStatus
    progress: float
    inputs: dict[str, Any]
    workload: TaskWorkload
    created_at: datetime
    output: Optional[dict[str, Any]] = None
    result: Any = None
    error: Optional[Exception] = None
    future: Optional[Future[Any]] = None
    controller: Optional[TaskController] = None
    lock: threading.Lock = field(default_factory=threading.Lock, repr=False)

    def is_done(self) -> bool:
        """Check if task is complete."""
        return self.status in (
            TaskStatus.COMPLETED,
            TaskStatus.FAILED,
            TaskStatus.CANCELLED,
        )

    def stop(self) -> None:
        """Stop the task."""
        if self.controller:
            self.controller.request_stop()

    def get_progress(self) -> float:
        """Get the task progress."""
        with self.lock:
            return self.progress

    def get_status(self) -> TaskStatus:
        """Get the task status."""
        with self.lock:
            return self.status

    def get_output(self) -> Optional[dict[str, Any]]:
        """Get the task output."""
        with self.lock:
            return self.output

get_output()

Get the task output.

Source code in numerous/tasks/types.py
101
102
103
104
def get_output(self) -> Optional[dict[str, Any]]:
    """Get the task output."""
    with self.lock:
        return self.output

get_progress()

Get the task progress.

Source code in numerous/tasks/types.py
91
92
93
94
def get_progress(self) -> float:
    """Get the task progress."""
    with self.lock:
        return self.progress

get_status()

Get the task status.

Source code in numerous/tasks/types.py
96
97
98
99
def get_status(self) -> TaskStatus:
    """Get the task status."""
    with self.lock:
        return self.status

is_done()

Check if task is complete.

Source code in numerous/tasks/types.py
78
79
80
81
82
83
84
def is_done(self) -> bool:
    """Check if task is complete."""
    return self.status in (
        TaskStatus.COMPLETED,
        TaskStatus.FAILED,
        TaskStatus.CANCELLED,
    )

stop()

Stop the task.

Source code in numerous/tasks/types.py
86
87
88
89
def stop(self) -> None:
    """Stop the task."""
    if self.controller:
        self.controller.request_stop()

TaskStatus

Bases: Enum

Task instance status.

Source code in numerous/tasks/types.py
26
27
28
29
30
31
32
33
class TaskStatus(Enum):
    """Task instance status."""

    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

get_task_controller()

Get the task controller for the current task execution.

Source code in numerous/tasks/context.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_task_controller() -> TaskController:
    """Get the task controller for the current task execution."""
    controller = _current_controller.get()
    if controller is not None:
        return controller

    instance_id = os.getenv("NUMEROUS_TASK_INSTANCE_ID")
    if instance_id is None:
        msg = (
            "No task controller available. "
            "Must be called within a task execution context, "
            "or NUMEROUS_TASK_INSTANCE_ID must be set (platform mode)."
        )
        raise RuntimeError(msg)

    controller = _create_platform_controller(instance_id)
    _current_controller.set(controller)
    return controller

get_task_definition(task_id)

Get a task definition by ID.

Source code in numerous/tasks/task.py
149
150
151
def get_task_definition(task_id: str) -> Optional[TaskDefinition]:
    """Get a task definition by ID."""
    return _store.get_task_definition(task_id)

get_task_inputs()

Get the inputs for the current task execution.

Source code in numerous/tasks/context.py
48
49
50
51
52
53
54
55
56
57
58
def get_task_inputs() -> dict[str, Any]:
    """Get the inputs for the current task execution."""
    inputs = _current_inputs.get()
    if inputs is not None:
        return inputs

    input_data = os.getenv("TASK_DATA_INPUT")
    inputs = deserialize_task_inputs(input_data) if input_data else {}

    _current_inputs.set(inputs)
    return inputs

get_task_instance(instance_id)

Get a task instance by ID.

Source code in numerous/tasks/task.py
164
165
166
def get_task_instance(instance_id: str) -> Optional[TaskInstanceState]:
    """Get a task instance by ID."""
    return _store.get_task_instance(instance_id)

list_task_definitions()

List all registered task definitions.

Source code in numerous/tasks/task.py
154
155
156
def list_task_definitions() -> list[TaskDefinition]:
    """List all registered task definitions."""
    return _store.list_task_definitions()

list_task_instances(task_id=None)

List all task instances, optionally filtered by task_id.

Source code in numerous/tasks/task.py
169
170
171
def list_task_instances(task_id: Optional[str] = None) -> list[TaskInstanceState]:
    """List all task instances, optionally filtered by task_id."""
    return _store.list_task_instances(task_id)

register_task(task_def)

Register a task definition globally.

Source code in numerous/tasks/task.py
144
145
146
def register_task(task_def: TaskDefinition) -> TaskDefinition:
    """Register a task definition globally."""
    return _store.register_task(task_def)

run_task(task_def, inputs, block=False, workload=TaskWorkload.LOCAL)

High-level function to create and execute a task.

This composes the lower-level functions into a useful operation.

Source code in numerous/tasks/task.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def run_task(
    task_def: TaskDefinition,
    inputs: dict[str, Any],
    block: bool = False,  # noqa: FBT001,FBT002
    workload: TaskWorkload = TaskWorkload.LOCAL,
) -> TaskInstanceState:
    """
    High-level function to create and execute a task.

    This composes the lower-level functions into a useful operation.
    """
    # Create instance
    state = create_task_instance(task_def, inputs, workload)
    register_instance(state)

    # Get executor
    executor = _get_executor()

    # Submit for execution
    future = submit_task_execution(task_def, state, executor)

    # Optionally block
    if block:
        future.result()

    return state

stop_task_instance(instance_id)

Stop a running task instance.

Source code in numerous/tasks/task.py
207
208
209
210
211
212
def stop_task_instance(instance_id: str) -> Optional[TaskInstanceState]:
    """Stop a running task instance."""
    state = get_task_instance(instance_id)
    if state:
        return request_stop(state)
    return None

wait_for_completion(state)

Wait for task completion and return result.

Source code in numerous/tasks/task.py
215
216
217
218
219
def wait_for_completion(state: TaskInstanceState) -> Any:  # noqa: ANN401
    """Wait for task completion and return result."""
    if state.future:
        return state.future.result()
    return state.result