Namespace: activity
This package's main export is Context. Get the current Activity's context with
Context.current()
:
import { Context } from '@temporalio/activity';
export async function myActivity() {
const context = Context.current();
}
Any function can be used as an Activity as long as its parameters and return value are serializable using a DataConverter.
Cancellation
Activities may be cancelled only if they emit heartbeats.
There are two ways to handle Activity cancellation:
- await on
Context.current().cancelled
orContext.current().sleep()
, which each throw a CancelledFailure. - Pass the context's
AbortSignal
atContext.current().cancellationSignal
to a library that supports it.
Examples
An Activity that sends progress heartbeats and can be cancelled
import { CancelledFailure, Context } from '@temporalio/activity';
export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
try {
// allow for resuming from heartbeat
const startingPoint = Context.current().info.heartbeatDetails || 1;
console.log('Starting activity at progress:', startingPoint);
for (let progress = startingPoint; progress <= 100; ++progress) {
// simple utility to sleep in activity for given interval or throw if Activity is cancelled
// don't confuse with Workflow.sleep which is only used in Workflow functions!
console.log('Progress:', progress);
await Context.current().sleep(sleepIntervalMs);
Context.current().heartbeat(progress);
}
} catch (err) {
if (err instanceof CancelledFailure) {
console.log('Fake progress activity cancelled');
// Cleanup
}
throw err;
}
}
An Activity that makes a cancellable HTTP request
It passes the AbortSignal
to fetch
: fetch(url, { signal:
Context.current().cancellationSignal })
.
import fetch from 'node-fetch';
import { Context } from '@temporalio/activity';
export async function cancellableFetch(url: string): Promise<Uint8Array> {
const response = await fetch(url, { signal: Context.current().cancellationSignal });
const contentLengthHeader = response.headers.get('Content-Length');
if (contentLengthHeader === null) {
throw new Error('expected Content-Length header to be set');
}
const contentLength = parseInt(contentLengthHeader);
let bytesRead = 0;
const chunks: Buffer[] = [];
for await (const chunk of response.body) {
if (!(chunk instanceof Buffer)) {
throw new TypeError('Expected Buffer');
}
bytesRead += chunk.length;
chunks.push(chunk);
Context.current().heartbeat(bytesRead / contentLength);
}
return Buffer.concat(chunks);
}
Classes
Interfaces
References
ActivityFunction
Re-exports ActivityFunction
ActivityInterface
Re-exports ActivityInterface
ApplicationFailure
Re-exports ApplicationFailure
CancelledFailure
Re-exports CancelledFailure
UntypedActivities
Re-exports UntypedActivities
errorCode
Re-exports errorCode
errorMessage
Re-exports errorMessage