Skip to content

executor

numerous.tasks.executor

Task executor implementations.

LocalThreadTaskExecutor

Executor that runs tasks in a thread pool.

Source code in numerous/tasks/executor.py
85
86
87
88
89
90
91
92
93
class LocalThreadTaskExecutor:
    """Executor that runs tasks in a thread pool."""

    def __init__(self, max_workers: int = 4) -> None:
        self._executor = ThreadPoolExecutor(max_workers=max_workers)

    def submit(self, task_def: TaskDefinition, state: TaskInstanceState) -> Future[Any]:
        """Submit a task for execution."""
        return self._executor.submit(_execute_local_task, task_def, state)

submit(task_def, state)

Submit a task for execution.

Source code in numerous/tasks/executor.py
91
92
93
def submit(self, task_def: TaskDefinition, state: TaskInstanceState) -> Future[Any]:
    """Submit a task for execution."""
    return self._executor.submit(_execute_local_task, task_def, state)

PlatformExecutor

Executor that starts tasks on the Numerous platform via GraphQL.

Source code in numerous/tasks/executor.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class PlatformExecutor:
    """Executor that starts tasks on the Numerous platform via GraphQL."""

    def __init__(
        self,
        client: Client,
        organization_slug: str,
        deploy_id: str,
        store: PlatformTaskStore,
        poll_interval: float = 5.0,
    ) -> None:
        self._client = client
        self._org_slug = organization_slug
        self._deploy_id = deploy_id
        self._store = store
        self._poll_interval = poll_interval

    def submit(self, task_def: TaskDefinition, state: TaskInstanceState) -> Future[Any]:
        """Start a task on the Numerous platform."""
        input_json = None
        if state.inputs:
            input_json = serialize_task_inputs(state.inputs)

        result = self._client.task_start(
            organization_slug=self._org_slug,
            deploy_id=self._deploy_id,
            task_name=task_def.name,
            input_data=input_json,
        )

        state.id = result.id

        future: Future[Any] = Future()

        poll_thread = threading.Thread(
            target=self._poll_completion,
            args=(state.id, future, state),
            daemon=True,
            name=f"TaskPoll-{state.id[:8]}",
        )
        poll_thread.start()

        return future

    def _poll_completion(
        self, instance_id: str, future: Future[Any], state: TaskInstanceState
    ) -> None:
        while True:
            instance = self._store.get_task_instance(instance_id)

            if instance is None:
                future.set_exception(TaskInstanceNotFoundError(instance_id))
                return

            with state.lock:
                state.status = instance.status
                state.progress = instance.progress
                state.result = instance.result
                state.error = instance.error
                state.output = instance.output

            if instance.is_done():
                if instance.error:
                    future.set_exception(instance.error)
                else:
                    future.set_result(instance.result)
                return

            time.sleep(self._poll_interval)

submit(task_def, state)

Start a task on the Numerous platform.

Source code in numerous/tasks/executor.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def submit(self, task_def: TaskDefinition, state: TaskInstanceState) -> Future[Any]:
    """Start a task on the Numerous platform."""
    input_json = None
    if state.inputs:
        input_json = serialize_task_inputs(state.inputs)

    result = self._client.task_start(
        organization_slug=self._org_slug,
        deploy_id=self._deploy_id,
        task_name=task_def.name,
        input_data=input_json,
    )

    state.id = result.id

    future: Future[Any] = Future()

    poll_thread = threading.Thread(
        target=self._poll_completion,
        args=(state.id, future, state),
        daemon=True,
        name=f"TaskPoll-{state.id[:8]}",
    )
    poll_thread.start()

    return future

TaskExecutor

Bases: Protocol

Protocol for task execution.

Source code in numerous/tasks/executor.py
27
28
29
30
31
32
class TaskExecutor(Protocol):
    """Protocol for task execution."""

    def submit(self, task_def: TaskDefinition, state: TaskInstanceState) -> Future[Any]:
        """Submit a task for execution."""
        ...

submit(task_def, state)

Submit a task for execution.

Source code in numerous/tasks/executor.py
30
31
32
def submit(self, task_def: TaskDefinition, state: TaskInstanceState) -> Future[Any]:
    """Submit a task for execution."""
    ...