Skip to content

controller

numerous.tasks.controller

Task controller implementations.

LocalTaskController

Local implementation of TaskController for in-process execution.

Updates only the local TaskInstanceState without any backend synchronization.

Source code in numerous/tasks/controller.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class LocalTaskController:
    """
    Local implementation of TaskController for in-process execution.

    Updates only the local TaskInstanceState without any backend synchronization.
    """

    def __init__(self, instance_state: TaskInstanceState) -> None:
        self._state = instance_state
        self._should_stop = False

    def should_stop(self) -> bool:
        """Check if the task should stop."""
        return self._should_stop

    def request_stop(self) -> None:
        """Request the task to stop."""
        self._should_stop = True

    def set_progress(self, progress: float) -> None:
        """Set the task progress."""
        if not 0 <= progress <= 1:
            msg = "Progress must be between 0 and 1"
            raise ValueError(msg)
        with self._state.lock:
            self._state.progress = progress

    def set_status(self, status: Union[str, TaskStatus]) -> None:
        """Set the task status (useful for local debugging)."""
        if isinstance(status, str):
            with self._state.lock:
                self._state.status = TaskStatus(status)
        else:
            with self._state.lock:
                self._state.status = status

    def set_output(self, output: dict[str, Any]) -> None:
        """Set the task output."""
        with self._state.lock:
            self._state.output = output
            self._state.result = output.get("result")

request_stop()

Request the task to stop.

Source code in numerous/tasks/controller.py
54
55
56
def request_stop(self) -> None:
    """Request the task to stop."""
    self._should_stop = True

set_output(output)

Set the task output.

Source code in numerous/tasks/controller.py
75
76
77
78
79
def set_output(self, output: dict[str, Any]) -> None:
    """Set the task output."""
    with self._state.lock:
        self._state.output = output
        self._state.result = output.get("result")

set_progress(progress)

Set the task progress.

Source code in numerous/tasks/controller.py
58
59
60
61
62
63
64
def set_progress(self, progress: float) -> None:
    """Set the task progress."""
    if not 0 <= progress <= 1:
        msg = "Progress must be between 0 and 1"
        raise ValueError(msg)
    with self._state.lock:
        self._state.progress = progress

set_status(status)

Set the task status (useful for local debugging).

Source code in numerous/tasks/controller.py
66
67
68
69
70
71
72
73
def set_status(self, status: Union[str, TaskStatus]) -> None:
    """Set the task status (useful for local debugging)."""
    if isinstance(status, str):
        with self._state.lock:
            self._state.status = TaskStatus(status)
    else:
        with self._state.lock:
            self._state.status = status

should_stop()

Check if the task should stop.

Source code in numerous/tasks/controller.py
50
51
52
def should_stop(self) -> bool:
    """Check if the task should stop."""
    return self._should_stop

PlatformTaskController

Platform implementation of TaskController.

Syncs task state changes to the backend via GraphQL mutations.

Source code in numerous/tasks/controller.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class PlatformTaskController:
    """
    Platform implementation of TaskController.

    Syncs task state changes to the backend via GraphQL mutations.
    """

    def __init__(
        self,
        instance_state: TaskInstanceState,
        client: Client,
    ) -> None:
        self._state = instance_state
        self._client = client
        self._should_stop = False

    def should_stop(self) -> bool:
        """Check if the task should stop."""
        return self._should_stop

    def request_stop(self) -> None:
        """Request the task to stop via backend."""
        self._should_stop = True
        self._client.task_stop(task_instance_id=self._state.id)

    def set_progress(self, progress: float) -> None:
        """Report task progress to backend."""
        if not 0 <= progress <= 1:
            msg = "Progress must be between 0 and 1"
            raise ValueError(msg)

        progress_percent = progress * 100

        self._client.task_instance_update_progress(
            task_instance_id=self._state.id,
            value=progress_percent,
            message=None,
        )

    def set_status(self, _: Union[str, TaskStatus]) -> None:
        """Status is managed by the backend and cannot be set directly."""
        msg = (
            "Cannot set status in platform mode. "
            "Status is determined by the backend based on task execution state."
        )
        raise NotImplementedError(msg)

    def set_output(self, output: dict[str, Any]) -> None:
        """Set task output in backend."""
        output_json = serialize_task_output(output)
        self._client.task_instance_set_output(
            task_instance_id=self._state.id,
            value=output_json,
        )

        with self._state.lock:
            self._state.output = output
            self._state.result = output.get("result")

request_stop()

Request the task to stop via backend.

Source code in numerous/tasks/controller.py
102
103
104
105
def request_stop(self) -> None:
    """Request the task to stop via backend."""
    self._should_stop = True
    self._client.task_stop(task_instance_id=self._state.id)

set_output(output)

Set task output in backend.

Source code in numerous/tasks/controller.py
129
130
131
132
133
134
135
136
137
138
139
def set_output(self, output: dict[str, Any]) -> None:
    """Set task output in backend."""
    output_json = serialize_task_output(output)
    self._client.task_instance_set_output(
        task_instance_id=self._state.id,
        value=output_json,
    )

    with self._state.lock:
        self._state.output = output
        self._state.result = output.get("result")

set_progress(progress)

Report task progress to backend.

Source code in numerous/tasks/controller.py
107
108
109
110
111
112
113
114
115
116
117
118
119
def set_progress(self, progress: float) -> None:
    """Report task progress to backend."""
    if not 0 <= progress <= 1:
        msg = "Progress must be between 0 and 1"
        raise ValueError(msg)

    progress_percent = progress * 100

    self._client.task_instance_update_progress(
        task_instance_id=self._state.id,
        value=progress_percent,
        message=None,
    )

set_status(_)

Status is managed by the backend and cannot be set directly.

Source code in numerous/tasks/controller.py
121
122
123
124
125
126
127
def set_status(self, _: Union[str, TaskStatus]) -> None:
    """Status is managed by the backend and cannot be set directly."""
    msg = (
        "Cannot set status in platform mode. "
        "Status is determined by the backend based on task execution state."
    )
    raise NotImplementedError(msg)

should_stop()

Check if the task should stop.

Source code in numerous/tasks/controller.py
 98
 99
100
def should_stop(self) -> bool:
    """Check if the task should stop."""
    return self._should_stop

TaskController

Bases: Protocol

Controller interface injected into task functions.

Source code in numerous/tasks/controller.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class TaskController(Protocol):
    """Controller interface injected into task functions."""

    def should_stop(self) -> bool:
        """Check if the task should stop."""
        ...

    def request_stop(self) -> None:
        """Request the task to stop."""
        ...

    def set_progress(self, progress: float) -> None:
        """Set the task progress (0.0 to 1.0)."""
        ...

    def set_status(self, status: Union[str, TaskStatus]) -> None:
        """Set the task status."""
        ...

    def set_output(self, output: dict[str, Any]) -> None:
        """Set the task output."""
        ...

request_stop()

Request the task to stop.

Source code in numerous/tasks/controller.py
22
23
24
def request_stop(self) -> None:
    """Request the task to stop."""
    ...

set_output(output)

Set the task output.

Source code in numerous/tasks/controller.py
34
35
36
def set_output(self, output: dict[str, Any]) -> None:
    """Set the task output."""
    ...

set_progress(progress)

Set the task progress (0.0 to 1.0).

Source code in numerous/tasks/controller.py
26
27
28
def set_progress(self, progress: float) -> None:
    """Set the task progress (0.0 to 1.0)."""
    ...

set_status(status)

Set the task status.

Source code in numerous/tasks/controller.py
30
31
32
def set_status(self, status: Union[str, TaskStatus]) -> None:
    """Set the task status."""
    ...

should_stop()

Check if the task should stop.

Source code in numerous/tasks/controller.py
18
19
20
def should_stop(self) -> bool:
    """Check if the task should stop."""
    ...