Skip to main content

Namespace: worker

The temporal worker connects to the service and runs workflows and activities.

Usage

import { NativeConnection, Worker } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
// Step 1: Establish a connection with Temporal server.
//
// Worker code uses `@temporalio/worker.NativeConnection`.
// (But in your application code it's `@temporalio/client.Connection`.)
const connection = await NativeConnection.connect({
address: 'localhost:7233',
// TLS and gRPC metadata configuration goes here.
});
try {
// Step 2: Register Workflows and Activities with the Worker.
const worker = await Worker.create({
connection,
namespace: 'default',
taskQueue: 'hello-world',
// Workflows are registered using a path as they run in a separate JS context.
workflowsPath: require.resolve('./workflows'),
activities,
});

// Step 3: Start accepting tasks on the `hello-world` queue
//
// The worker runs until it encounters an unexpected error or the process receives a shutdown signal registered on
// the SDK Runtime object.
//
// By default, worker logs are written via the Runtime logger to STDERR at INFO level.
//
// See https://typescript.temporal.io/api/classes/worker.Runtime#install to customize these defaults.
await worker.run();
} finally {
// Close the connection once the worker has stopped
await connection.close();
}
}

run().catch((err) => {
console.error(err);
process.exit(1);
});

Classes

Interfaces

References

DataConverter

Re-exports DataConverter


Headers

Re-exports Headers


IllegalStateError

Re-exports IllegalStateError


LogLevel

Re-exports LogLevel


LogMetadata

Re-exports LogMetadata


Logger

Re-exports Logger


LoggerSinks

Re-exports LoggerSinks


Next

Re-exports Next


defaultPayloadConverter

Re-exports defaultPayloadConverter

Type Aliases

ActivityInterceptorsFactory

Ƭ ActivityInterceptorsFactory: (ctx: Context) => ActivityInterceptors

A function that takes Activity Context and returns an interceptor

Type declaration

▸ (ctx): ActivityInterceptors

Parameters
NameType
ctxContext
Returns

ActivityInterceptors


CompiledWorkerInterceptors

Ƭ CompiledWorkerInterceptors: Required<Pick<WorkerInterceptors, "activity" | "workflowModules">>


GetLogAttributesInput

Ƭ GetLogAttributesInput: Record<string, unknown>

Input for ActivityOutboundCallsInterceptor.getLogAttributes


History

Ƭ History: IHistory


InjectedSink

Ƭ InjectedSink<T>: { [P in keyof T]: InjectedSinkFunction<T[P]> }

Converts a Sink from a mapping of name to function to a mapping of name to InjectedSinkFunction

Type parameters

NameType
Textends Sink

InjectedSinks

Ƭ InjectedSinks<T>: { [P in keyof T]: InjectedSink<T[P]> }

Converts a Sinks interface from a mapping of name to Sink to a mapping of name to InjectedSink.

Used for type checking Sink injection against supplied type param T.

Type parameters

NameType
Textends Sinks

MetricsExporter

Ƭ MetricsExporter: { temporality?: "cumulative" | "delta" } & PrometheusMetricsExporter | OtelCollectorExporter

Metrics exporters supported by Core

temporality is the type of aggregation temporality for metric export. Applies to both Prometheus and OpenTelemetry exporters.

See the OpenTelemetry specification for more information.


ReplayHistoriesIterable

Ƭ ReplayHistoriesIterable: AsyncIterable<HistoryAndWorkflowId> | Iterable<HistoryAndWorkflowId>

An iterable on workflow histories and their IDs, used for batch replaying.


RequiredNativeConnectionOptions

Ƭ RequiredNativeConnectionOptions: Omit<Required<NativeConnectionOptions>, "tls" | "proxy" | "metadata" | "apiKey"> & { apiKey?: NativeConnectionOptions["apiKey"] ; metadata?: NativeConnectionOptions["metadata"] ; proxy?: NativeConnectionOptions["proxy"] ; sdkVersion: string ; tls?: NativeConnectionOptions["tls"] }


SlotInfo

Ƭ SlotInfo: WorkflowSlotInfo | ActivitySlotInfo | LocalActivitySlotInfo


SlotSupplier

Ƭ SlotSupplier: ResourceBasedSlotsForType | FixedSizeSlotSupplier | CustomSlotSupplier<any>

Controls how slots are handed out for a specific task type.

For now, only ResourceBasedSlotOptions and FixedSizeSlotSupplier are supported, but we may add support for custom tuners in the future.


State

Ƭ State: "INITIALIZED" | "RUNNING" | "STOPPED" | "STOPPING" | "DRAINING" | "DRAINED" | "FAILED"

The worker's possible states

  • INITIALIZED - The initial state of the Worker after calling Worker.create and successful connection to the server
  • RUNNING - Worker.run was called, polling task queues
  • STOPPING - Worker.shutdown was called or received shutdown signal, worker will forcefully shutdown in shutdownGraceTime
  • DRAINING - Core has indicated that shutdown is complete and all Workflow tasks have been drained, waiting for activities and cached workflows eviction
  • DRAINED - All activities and workflows have completed, ready to shutdown
  • STOPPED - Shutdown complete, Worker.run resolves
  • FAILED - Worker encountered an unrecoverable error, Worker.run should reject with the error

TLSConfig

Ƭ TLSConfig: TLSConfig


TelemLogger

Ƭ TelemLogger: ConsoleLogger | ForwardLogger

Logger types supported by Core


WorkerTuner

Ƭ WorkerTuner: ResourceBasedTuner | TunerHolder

Controls how slots for different task types will be handed out.


WorkflowBundleOption

Ƭ WorkflowBundleOption: WorkflowBundle | WorkflowBundleWithSourceMap | WorkflowBundlePath | WorkflowBundlePathWithSourceMap

Variables

LogTimestamp

Const LogTimestamp: unique symbol


WorkflowInboundLogInterceptor

Const WorkflowInboundLogInterceptor: typeof WorkflowLogInterceptor

Deprecated

WorkflowInboundLogInterceptor is deprecated. Workflow lifecycle events are now automatically logged by the SDK. To customize workflow log attributes, simply register a custom WorkflowInterceptors that intercepts the outbound.getLogAttributes() method.


defaultWorkflowInterceptorModules

Const defaultWorkflowInterceptorModules: never[]

Deprecated

Including defaultWorkflowInterceptorModules in BundlerOptions.workflowInterceptorModules is no longer required.


errors

Const errors: Object

Deprecated

Import error classes directly

Type declaration

NameType
GracefulShutdownPeriodExpiredErrortypeof GracefulShutdownPeriodExpiredError
IllegalStateErrortypeof IllegalStateError
ShutdownErrortypeof ShutdownError
TransportErrortypeof TransportError
UnexpectedErrortypeof UnexpectedError

Functions

activityLogAttributes

activityLogAttributes(info): Record<string, unknown>

Returns a map of attributes to be set on log messages for a given Activity

Parameters

NameType
infoInfo

Returns

Record<string, unknown>


appendDefaultInterceptors

appendDefaultInterceptors(interceptors, logger?): WorkerInterceptors

Appends the default Worker logging interceptors to given interceptor arrays.

Parameters

NameTypeDescription
interceptorsWorkerInterceptors-
logger?Loggera Logger - defaults to the Runtime singleton logger.

Returns

WorkerInterceptors

Deprecated

Calling appendDefaultInterceptors() is no longer required. To configure a custom logger, set the Runtime.logger property instead.


bundleWorkflowCode

bundleWorkflowCode(options): Promise<WorkflowBundleWithSourceMap>

Create a bundle to pass to WorkerOptions.workflowBundle. Helpful for reducing Worker startup time in production.

When using with Worker.runReplayHistory, make sure to pass the same interceptors and payload converter used when the history was generated.

Parameters

NameType
optionsBundleOptions

Returns

Promise<WorkflowBundleWithSourceMap>


defaultSinks

defaultSinks(logger?): InjectedSinks<LoggerSinks>

Build the sink used internally by the SDK to forwards log messages from the Workflow sandbox to an actual logger.

Parameters

NameTypeDescription
logger?Loggera Logger - defaults to the Runtime singleton logger.

Returns

InjectedSinks<LoggerSinks>

Deprecated

Calling defaultSink() is no longer required. To configure a custom logger, set the Runtime.logger property instead.


makeTelemetryFilterString

makeTelemetryFilterString(options): string

A helper to build a filter string for use in RuntimeOptions.telemetryOptions.tracingFilter.

Example:

telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({ core: 'TRACE', other: 'DEBUG' });
// ...
},
}

Parameters

NameType
optionsMakeTelemetryFilterStringOptions

Returns

string


startDebugReplayer

startDebugReplayer(options): void

Start a replayer for debugging purposes.

Use this method to integrate the replayer with external debuggers like the Temporal VS Code debbuger extension.

Parameters

NameType
optionsReplayWorkerOptions

Returns

void


timeOfDayToBigint

timeOfDayToBigint(timeOfDay): bigint

Takes a [seconds, nanos] tuple as returned from getTimeOfDay and turns it into bigint.

Parameters

NameType
timeOfDay[number, number]

Returns

bigint


workflowLogAttributes

workflowLogAttributes(info): Record<string, unknown>

Returns a map of attributes to be set by default on log messages for a given Workflow. Note that this function may be called from outside of the Workflow context (eg. by the worker itself).

Parameters

NameType
infoWorkflowInfo

Returns

Record<string, unknown>