Namespace: activity
This package's main export is Context. Get the current Activity's context with
Context.current()
:
import { Context } from '@temporalio/activity';
export async function myActivity() {
const context = Context.current();
}
Any function can be used as an Activity as long as its parameters and return value are serializable using a DataConverter.
Cancellation
Activity Cancellation:
- lets the Activity know it doesn't need to keep doing work, and
- gives the Activity time to clean up any resources it has created.
Activities can only receive Cancellation if they emit heartbeats or are Local Activities (which can't heartbeat but receive Cancellation anyway).
An Activity may receive Cancellation if:
- The Workflow scope containing the Activity call was requested to be Cancelled and
ActivityOptions.cancellationType was not set to ActivityCancellationType.ABANDON. The scope can
be cancelled in either of the following ways:
- The entire Workflow was Cancelled (via WorkflowHandle.cancel).
- Calling CancellationScope.cancel) from inside a Workflow.
- The Worker has started to shut down. Shutdown is initiated by either:
- One of the RuntimeOptions.shutdownSignals was sent to the process.
- Worker.shutdown |
Worker.shutdown()
was called.
- The Activity was considered failed by the Server because any of the Activity timeouts have triggered (for example,
the Server didn't receive a heartbeat within the ActivityOptions.heartbeatTimeout). The
CancelledFailure will have
message: 'TIMED_OUT'
. - An Activity sends a heartbeat with
Context.current().heartbeat()
and the heartbeat details can't be converted by the Worker's configured DataConverter. - The Workflow Run reached a Closed state, in which case the
CancelledFailure will have
message: 'NOT_FOUND'
.
The reason for the Cancellation is available at CancelledFailure.message or Context.cancellationSignal.reason.
Activity implementations should opt-in and subscribe to cancellation using one of the following methods:
await
onContext.current().cancelled
orContext.current().sleep()
, which each throw a CancelledFailure.- Pass the context's
AbortSignal
atContext.current().cancellationSignal
to a library that supports it.
Examples
An Activity that sends progress heartbeats and can be Cancelled
import { CancelledFailure, Context } from '@temporalio/activity';
export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
const { log, info, sleep, heartbeat } = Context.current();
try {
// allow for resuming from heartbeat
const startingPoint = info.heartbeatDetails || 1;
log.info('Starting activity at progress', { startingPoint });
for (let progress = startingPoint; progress <= 100; ++progress) {
// simple utility to sleep in activity for given interval or throw if Activity is cancelled
// don't confuse with Workflow.sleep which is only used in Workflow functions!
log.info('Progress', { progress });
await sleep(sleepIntervalMs);
heartbeat(progress);
}
} catch (err) {
if (err instanceof CancelledFailure) {
log.warn('Fake progress activity cancelled', { message: err.message });
// Cleanup
}
throw err;
}
}
An Activity that makes a cancellable HTTP request
It passes the AbortSignal
to fetch
: fetch(url, { signal:
Context.current().cancellationSignal })
.
import fetch from 'node-fetch';
import { Context } from '@temporalio/activity';
import type { AbortSignal as FetchAbortSignal } from 'node-fetch/externals';
export async function cancellableFetch(url: string): Promise<Uint8Array> {
const response = await fetch(url, { signal: Context.current().cancellationSignal as FetchAbortSignal });
const contentLengthHeader = response.headers.get('Content-Length');
if (contentLengthHeader === null) {
throw new Error('expected Content-Length header to be set');
}
const contentLength = parseInt(contentLengthHeader);
let bytesRead = 0;
const chunks: Buffer[] = [];
for await (const chunk of response.body) {
if (!(chunk instanceof Buffer)) {
throw new TypeError('Expected Buffer');
}
bytesRead += chunk.length;
chunks.push(chunk);
Context.current().heartbeat(bytesRead / contentLength);
}
return Buffer.concat(chunks);
}
Classes
Interfaces
References
ActivityFunction
Re-exports ActivityFunction
ActivityInterface
Re-exports ActivityInterface
ApplicationFailure
Re-exports ApplicationFailure
CancelledFailure
Re-exports CancelledFailure
UntypedActivities
Re-exports UntypedActivities
Variables
asyncLocalStorage
• Const
asyncLocalStorage: AsyncLocalStorage
<Context
>