Guide

Connectors

A connector owns a piece of the outside world — a socket, a client, a pool, a file — and acquires it on start, releases it on stop. Connectors do two things: some provide a source that feeds messages into a flow, and some provide a block that a flow calls by name (a logger, a DB, an HTTP client, an LLM).

ConnectorSource?Provides blockWhat it is
cronyesFires a message on a schedule.
httpyesAn HTTP server; each request is a message, the reply is the flow's result.
http-clientrestAn outbound HTTP client with base URL, auth, and optional caching.
databasesqlA connection pool for Postgres or SQLite.
loggerlogA configured structured logger (stdout/stderr/file).
llm-anthropicai-mapping + AI compositesAnthropic Claude client.
llm-openaiai-mapping + AI compositesOpenAI client.
llm-geminiai-mapping + AI compositesGoogle Gemini client.
Referencing connectors. A block names the connector it uses via a connector: (or logger:) setting. A flow's source: names its connector with connector: and a type:. Settings accept ${ENV_VAR} interpolation from the flow's declared env:.
Source

cron

Emits a message on a schedule. Supports six-field cron (seconds enabled, e.g. 0,30 * * * * *) and descriptors like @every 30s. The payload expression builds the body and can read now (the fire time).

SettingTypeNotes
schedulestringRequired. Cron expression or @every <dur>.
payloadstring (CEL)Builds the message body. Sees now. Empty → null body.
correlationIDstringOptional; set on every emitted message.
source:
  connector: ticker
  type: cron
  settings:
    schedule: "@every 30s"
    payload: '{"firedAt": string(now)}'
Source

http

One server per connector. Each source registers a route; a request becomes a message, the flow runs, and its final body is written back as JSON. The connector holds server-wide settings; the source holds the route.

Connector settings (the server)

SettingDefault
port8080 (0 = OS-assigned)
hostall interfaces
basePath— (prefix for every route)
requestTimeout30s
readTimeout / writeTimeout / idleTimeoutunset
keepAliveGo default

Source settings (the route)

SettingNotes
pathRequired. e.g. /orders/{id}; {id}vars.id.
headersList of request headers to capture as vars["X-…"].
correlationIdHeaderHeader to read the message's correlationID from.
timeoutPer-route wait; defaults to requestTimeout.
maxBodyBytesRequest body cap. Default 1 MiB.

What a request puts on the message

bodyThe JSON request body (400 if not valid JSON; empty is allowed).
vars.method"GET", "POST", …
vars.queryAlways a map (empty if no query string).
vars.<param>Each {param} in the path.
vars["X-Header"]Each header listed in headers (empty string if absent).

How the flow result becomes a response

Flow outcomeHTTP status
completed200 (override with vars.httpStatus)
dropped (nil)204 No Content
failed500 Internal Server Error
timed out504 Gateway Timeout

See Error handling for setting vars.httpStatus and recovering from failures.

Block · rest

http-client

An outbound HTTP client that provides the rest block. It concentrates client-wide policy — base URL, default headers, auth, timeout, and an opt-in in-memory GET cache.

Connector settings

SettingNotes
baseURLRequired, absolute. Prepended to request paths.
timeoutPer-request. Default 30s.
headersMap applied to every request (unless the request sets it).
authtype: bearer (+token) or basic (+username/password).
cacheenabled, ttl (60s), maxEntries (256). GET only.
maxResponseBytesResponse cap. Default 1 MiB.

The rest block

SettingNotes
connectorRequired. The http-client to use.
method / pathMethod (default GET) and path appended to baseURL.
query / headersMaps of name → CEL expression.
bodyCEL expression (write methods). Strings sent as-is; else JSON.
failOnErrorStatus ≥ 400 → error. Default true.
statusVarVariable for the response status. Default statusCode.
- type: rest
  settings:
    connector: open-meteo
    method: GET
    path: /v1/forecast
    query: { latitude: '"52.52"', current: '"temperature_2m"' }
Block · sql

database

A connection pool for Postgres (pgx) or SQLite — both pure Go, no CGO. Provides the sql block. Use the driver's native placeholders: $1, $2 for Postgres, ? for SQLite.

Connector settings

driverRequired. postgres or sqlite.
dsnRequired. Connection string / file path.
maxOpenConns / maxIdleConnsPool sizing.
connMaxLifetimee.g. 5m.

The sql block

connectorRequired. The database to use.
queryRequired. SQL with placeholders.
argsList of CEL expressions for bind params.
execNo result set → {"rowsAffected": N}.
singleReturn the first row (object) instead of an array.
- type: sql
  settings:
    connector: orders-db
    query: "INSERT INTO orders (item, amount) VALUES (?, ?) RETURNING *"
    args: [ body.item, body.amount ]
    single: true
Block · log

logger

A configured structured logger providing the log block — a wire tap that logs and passes the message through unchanged. A file logger opens on start and closes on stop.

Connector settings

outputstdout (default), stderr, or a file path.
formattext (default) or json.
leveldebug / info (default) / warn / error.
addSourceInclude source file:line. Default false.

The log block

messageCEL expression for the line. Sees body, vars, eventID, correlationID.
levelEmission level for this block.
loggerConnector name (empty = process default).
fullAttach the whole message as structured attributes.
Blocks · ai-mapping · ai-router · ai-agent · ai-retry

LLM connectors

Three providers behind one provider-agnostic interface: llm-anthropic, llm-openai, and llm-gemini. Any of them backs the AI blocks. The settings are identical across providers; only the default model differs.

SettingNotes
apiKeyRequired. Use ${ANTHROPIC_API_KEY} etc.
modelDefaults per provider: Anthropic claude-sonnet-4-6, OpenAI gpt-5.4, Gemini gemini-3.5-flash.
maxTokensResponse cap (Anthropic default 4096; others use the model default).
baseURLEndpoint override (proxies, Azure, testing).
connectors:
  - name: claude
    type: llm-anthropic
    settings:
      apiKey: ${ANTHROPIC_API_KEY}
      # model defaults to claude-sonnet-4-6
🗺️

ai-mapping

Reshape the body to a target shape from a prompt, optional examples, and an optional output JSON Schema. On a schema-validation failure it errors — so it composes with ai-retry / handle-errors.

🔀

ai-router

The model picks one of several named, described routes per message. A guardrail describes when to fall through to the default path.

🛠️

ai-agent

The model accomplishes a task by calling your sub-flows as tools in a loop, up to maxIterations, then returns a result.

♻️

ai-retry

Runs a protected chain; on failure the model inspects vars.error, revises the message, and retries up to maxAttempts before falling to the error chain.

See the AI samples ↗