Skip to content

task

numerous.tasks.task

Task management API - Functional Interface.

create_task_definition(func, name=None, app_id=None)

Create a task definition from a function.

Source code in numerous/tasks/task.py
76
77
78
79
80
81
82
83
84
85
86
87
def create_task_definition(
    func: Callable[..., Any],
    name: Optional[str] = None,
    app_id: Optional[str] = None,
) -> TaskDefinition:
    """Create a task definition from a function."""
    return TaskDefinition(
        id=str(uuid.uuid4()),
        name=name or func.__name__,
        func=func,
        app_id=app_id,
    )

create_task_instance(task_def, inputs, workload=TaskWorkload.LOCAL)

Create a new task instance state.

Source code in numerous/tasks/task.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def create_task_instance(
    task_def: TaskDefinition,
    inputs: dict[str, Any],
    workload: TaskWorkload = TaskWorkload.LOCAL,
) -> TaskInstanceState:
    """Create a new task instance state."""
    return TaskInstanceState(
        id=str(uuid.uuid4()),
        task_id=task_def.id,
        status=TaskStatus.PENDING,
        progress=0.0,
        inputs=inputs,
        workload=workload,
        created_at=datetime.now().astimezone(),
    )

get_task_definition(task_id)

Get a task definition by ID.

Source code in numerous/tasks/task.py
149
150
151
def get_task_definition(task_id: str) -> Optional[TaskDefinition]:
    """Get a task definition by ID."""
    return _store.get_task_definition(task_id)

get_task_instance(instance_id)

Get a task instance by ID.

Source code in numerous/tasks/task.py
164
165
166
def get_task_instance(instance_id: str) -> Optional[TaskInstanceState]:
    """Get a task instance by ID."""
    return _store.get_task_instance(instance_id)

list_task_definitions()

List all registered task definitions.

Source code in numerous/tasks/task.py
154
155
156
def list_task_definitions() -> list[TaskDefinition]:
    """List all registered task definitions."""
    return _store.list_task_definitions()

list_task_instances(task_id=None)

List all task instances, optionally filtered by task_id.

Source code in numerous/tasks/task.py
169
170
171
def list_task_instances(task_id: Optional[str] = None) -> list[TaskInstanceState]:
    """List all task instances, optionally filtered by task_id."""
    return _store.list_task_instances(task_id)

register_instance(state)

Register a task instance.

Source code in numerous/tasks/task.py
159
160
161
def register_instance(state: TaskInstanceState) -> TaskInstanceState:
    """Register a task instance."""
    return _store.register_instance(state)

register_task(task_def)

Register a task definition globally.

Source code in numerous/tasks/task.py
144
145
146
def register_task(task_def: TaskDefinition) -> TaskDefinition:
    """Register a task definition globally."""
    return _store.register_task(task_def)

request_stop(state)

Request the task to stop.

Source code in numerous/tasks/task.py
121
122
123
124
125
def request_stop(state: TaskInstanceState) -> TaskInstanceState:
    """Request the task to stop."""
    if state.controller:
        state.controller.request_stop()
    return state

run_task(task_def, inputs, block=False, workload=TaskWorkload.LOCAL)

High-level function to create and execute a task.

This composes the lower-level functions into a useful operation.

Source code in numerous/tasks/task.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def run_task(
    task_def: TaskDefinition,
    inputs: dict[str, Any],
    block: bool = False,  # noqa: FBT001,FBT002
    workload: TaskWorkload = TaskWorkload.LOCAL,
) -> TaskInstanceState:
    """
    High-level function to create and execute a task.

    This composes the lower-level functions into a useful operation.
    """
    # Create instance
    state = create_task_instance(task_def, inputs, workload)
    register_instance(state)

    # Get executor
    executor = _get_executor()

    # Submit for execution
    future = submit_task_execution(task_def, state, executor)

    # Optionally block
    if block:
        future.result()

    return state

set_progress(state, progress)

Set task progress, delegating to controller if available.

Source code in numerous/tasks/task.py
107
108
109
110
111
def set_progress(state: TaskInstanceState, progress: float) -> TaskInstanceState:
    """Set task progress, delegating to controller if available."""
    if state.controller:
        state.controller.set_progress(progress)
    return state

set_status(state, status)

Set task status, delegating to controller if available.

Source code in numerous/tasks/task.py
114
115
116
117
118
def set_status(state: TaskInstanceState, status: TaskStatus) -> TaskInstanceState:
    """Set task status, delegating to controller if available."""
    if state.controller:
        state.controller.set_status(status)
    return state

stop_task_instance(instance_id)

Stop a running task instance.

Source code in numerous/tasks/task.py
207
208
209
210
211
212
def stop_task_instance(instance_id: str) -> Optional[TaskInstanceState]:
    """Stop a running task instance."""
    state = get_task_instance(instance_id)
    if state:
        return request_stop(state)
    return None

submit_task_execution(task_def, state, executor)

Submit task for async execution and return future.

Source code in numerous/tasks/task.py
128
129
130
131
132
133
134
135
136
def submit_task_execution(
    task_def: TaskDefinition,
    state: TaskInstanceState,
    executor: TaskExecutor,
) -> Future[Any]:
    """Submit task for async execution and return future."""
    future = executor.submit(task_def, state)
    state.future = future
    return future

task(func=None, *, name=None, app_id=None)

task(
    func: Callable[..., R],
    *,
    name: Optional[str] = None,
    app_id: Optional[str] = None
) -> Callable[..., TaskInstanceState]
task(
    func: None = None,
    *,
    name: Optional[str] = None,
    app_id: Optional[str] = None
) -> Callable[
    [Callable[..., R]], Callable[..., TaskInstanceState]
]

Convert a function into a task.

Returns a new function that when called, creates and runs a task instance.

Usage

@task def my_task(x: int) -> int: return x + 1

@task(name="custom_name") def another_task(x: int) -> int: return x * 2

Calling the decorated function runs the task

instance = my_task(5)

Source code in numerous/tasks/task.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def task(
    func: Optional[Callable[..., R]] = None,
    *,
    name: Optional[str] = None,
    app_id: Optional[str] = None,
) -> Union[
    Callable[..., TaskInstanceState],
    Callable[[Callable[..., R]], Callable[..., TaskInstanceState]],
]:
    """
    Convert a function into a task.

    Returns a new function that when called, creates and runs a task instance.

    Usage:
        @task
        def my_task(x: int) -> int:
            return x + 1

        @task(name="custom_name")
        def another_task(x: int) -> int:
            return x * 2

        # Calling the decorated function runs the task
        instance = my_task(5)
    """

    def decorator(f: Callable[..., R]) -> Callable[..., TaskInstanceState]:
        # Inside a platform task instance just return the original function
        if not _is_platform_task_instance():
            task_def = create_task_definition(f, name=name, app_id=app_id)
            register_task(task_def)
        else:
            task_def = None

        @wraps(f)
        def wrapper(*args: Any, **kwargs: Any) -> TaskInstanceState:  # noqa: ANN401
            # Inside a platform task instance just run the function directly.
            if _is_platform_task_instance():
                return f(*args, **kwargs)  # type: ignore[return-value]

            # Convert args/kwargs to inputs dict
            sig = inspect.signature(f)
            params = list(sig.parameters.keys())

            # Build inputs
            inputs = {}
            for i, arg in enumerate(args):
                if i < len(params):
                    inputs[params[i]] = arg
            inputs.update(kwargs)

            # Run the task
            if task_def is None:
                msg = "task_def is not available"
                raise RuntimeError(msg)
            return run_task(task_def, inputs)

        if task_def is not None:
            wrapper.task_def = task_def  # type: ignore[attr-defined]
            wrapper.list_instances = lambda: list_task_instances(task_def.id)  # type: ignore[attr-defined]

        return wrapper

    if func is None:
        # Called with arguments: @task(name="...")
        return decorator
    # Called without arguments: @task
    return decorator(func)

wait_for_completion(state)

Wait for task completion and return result.

Source code in numerous/tasks/task.py
215
216
217
218
219
def wait_for_completion(state: TaskInstanceState) -> Any:  # noqa: ANN401
    """Wait for task completion and return result."""
    if state.future:
        return state.future.result()
    return state.result