Namespace: activity
This library provides tools for authoring activities.
Import this module from Activity code - must not be used in Workflows.
Any function can be used as an Activity as long as its parameters and return value are serializable using a DataConverter
.
Cancellation
Activities may be cancelled only if they emit heartbeats.
There are 2 ways to handle Activity cancellation:
- await on
Context.current().cancelled
- Pass the context's abort signal at
Context.current().cancellationSignal
to a library that supports it
Examples
An Activity that fakes progress and can be cancelled
import { Context } from '@temporalio/activity';
import { CancelledFailure } from '@temporalio/common';
export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
try {
// allow for resuming from heartbeat
const startingPoint = Context.current().info.heartbeatDetails || 1;
console.log('Starting activity at progress:', startingPoint);
for (let progress = startingPoint; progress <= 100; ++progress) {
// simple utility to sleep in activity for given interval or throw if Activity is cancelled
// don't confuse with Workflow.sleep which is only used in Workflow functions!
console.log('Progress:', progress);
await Context.current().sleep(sleepIntervalMs);
Context.current().heartbeat(progress);
}
} catch (err) {
if (err instanceof CancelledFailure) {
console.log('Fake progress activity cancelled');
// Cleanup
}
throw err;
}
}
An Activity that makes a cancellable HTTP request
import fetch from 'node-fetch';
import { Context } from '@temporalio/activity';
export async function cancellableFetch(url: string): Promise<Uint8Array> {
const response = await fetch(url, { signal: Context.current().cancellationSignal });
const contentLengthHeader = response.headers.get('Content-Length');
if (contentLengthHeader === null) {
throw new Error('expected Content-Length header to be set');
}
const contentLength = parseInt(contentLengthHeader);
let bytesRead = 0;
const chunks: Buffer[] = [];
for await (const chunk of response.body) {
if (!(chunk instanceof Buffer)) {
throw new TypeError('Expected Buffer');
}
bytesRead += chunk.length;
chunks.push(chunk);
Context.current().heartbeat(bytesRead / contentLength);
}
return Buffer.concat(chunks);
}
Classes
- CompleteAsyncError
- Context
- IllegalStateError
- PayloadConverterError
- ValueError
- WorkflowExecutionAlreadyStartedError
- WorkflowNotFoundError
Interfaces
References
ActivityFunction
Re-exports ActivityFunction
ActivityInterface
Re-exports ActivityInterface
CancelledFailure
Re-exports CancelledFailure
Functions
errorCode
▸ errorCode(error
): string
| undefined
Get error code from an Error or return undefined
Parameters
Name | Type |
---|---|
error | unknown |
Returns
string
| undefined
errorMessage
▸ errorMessage(err
): string
| undefined
Get error message from an Error or string or return undefined
Parameters
Name | Type |
---|---|
err | unknown |
Returns
string
| undefined