A connector owns a piece of the outside world — a socket, a client, a pool, a file — and acquires it on start, releases it on stop. Connectors do two things: some provide a source that feeds messages into a flow, and some provide a block that a flow calls by name (a logger, a DB, an HTTP client, an LLM).
| Connector | Source? | Provides block | What it is |
|---|---|---|---|
cron | yes | — | Fires a message on a schedule. |
http | yes | — | An HTTP server; each request is a message, the reply is the flow's result. |
http-client | — | rest | An outbound HTTP client with base URL, auth, and optional caching. |
database | — | sql | A connection pool for Postgres or SQLite. |
logger | — | log | A configured structured logger (stdout/stderr/file). |
llm-anthropic | — | ai-mapping + AI composites | Anthropic Claude client. |
llm-openai | — | ai-mapping + AI composites | OpenAI client. |
llm-gemini | — | ai-mapping + AI composites | Google Gemini client. |
connector: (or logger:) setting. A flow's source: names its connector with connector: and a type:. Settings accept ${ENV_VAR} interpolation from the flow's declared env:.cronEmits a message on a schedule. Supports six-field cron (seconds enabled, e.g. 0,30 * * * * *) and descriptors like @every 30s. The payload expression builds the body and can read now (the fire time).
| Setting | Type | Notes |
|---|---|---|
schedule | string | Required. Cron expression or @every <dur>. |
payload | string (CEL) | Builds the message body. Sees now. Empty → null body. |
correlationID | string | Optional; set on every emitted message. |
source:
connector: ticker
type: cron
settings:
schedule: "@every 30s"
payload: '{"firedAt": string(now)}'
httpOne server per connector. Each source registers a route; a request becomes a message, the flow runs, and its final body is written back as JSON. The connector holds server-wide settings; the source holds the route.
| Setting | Default |
|---|---|
port | 8080 (0 = OS-assigned) |
host | all interfaces |
basePath | — (prefix for every route) |
requestTimeout | 30s |
readTimeout / writeTimeout / idleTimeout | unset |
keepAlive | Go default |
| Setting | Notes |
|---|---|
path | Required. e.g. /orders/{id}; {id} → vars.id. |
headers | List of request headers to capture as vars["X-…"]. |
correlationIdHeader | Header to read the message's correlationID from. |
timeout | Per-route wait; defaults to requestTimeout. |
maxBodyBytes | Request body cap. Default 1 MiB. |
body | The JSON request body (400 if not valid JSON; empty is allowed). |
vars.method | "GET", "POST", … |
vars.query | Always a map (empty if no query string). |
vars.<param> | Each {param} in the path. |
vars["X-Header"] | Each header listed in headers (empty string if absent). |
| Flow outcome | HTTP status |
|---|---|
| completed | 200 (override with vars.httpStatus) |
| dropped (nil) | 204 No Content |
| failed | 500 Internal Server Error |
| timed out | 504 Gateway Timeout |
See Error handling for setting vars.httpStatus and recovering from failures.
resthttp-clientAn outbound HTTP client that provides the rest block. It concentrates client-wide policy — base URL, default headers, auth, timeout, and an opt-in in-memory GET cache.
| Setting | Notes |
|---|---|
baseURL | Required, absolute. Prepended to request paths. |
timeout | Per-request. Default 30s. |
headers | Map applied to every request (unless the request sets it). |
auth | type: bearer (+token) or basic (+username/password). |
cache | enabled, ttl (60s), maxEntries (256). GET only. |
maxResponseBytes | Response cap. Default 1 MiB. |
rest block| Setting | Notes |
|---|---|
connector | Required. The http-client to use. |
method / path | Method (default GET) and path appended to baseURL. |
query / headers | Maps of name → CEL expression. |
body | CEL expression (write methods). Strings sent as-is; else JSON. |
failOnError | Status ≥ 400 → error. Default true. |
statusVar | Variable for the response status. Default statusCode. |
- type: rest
settings:
connector: open-meteo
method: GET
path: /v1/forecast
query: { latitude: '"52.52"', current: '"temperature_2m"' }
sqldatabaseA connection pool for Postgres (pgx) or SQLite — both pure Go, no CGO. Provides the sql block. Use the driver's native placeholders: $1, $2 for Postgres, ? for SQLite.
driver | Required. postgres or sqlite. |
dsn | Required. Connection string / file path. |
maxOpenConns / maxIdleConns | Pool sizing. |
connMaxLifetime | e.g. 5m. |
sql blockconnector | Required. The database to use. |
query | Required. SQL with placeholders. |
args | List of CEL expressions for bind params. |
exec | No result set → {"rowsAffected": N}. |
single | Return the first row (object) instead of an array. |
- type: sql
settings:
connector: orders-db
query: "INSERT INTO orders (item, amount) VALUES (?, ?) RETURNING *"
args: [ body.item, body.amount ]
single: true
logloggerA configured structured logger providing the log block — a wire tap that logs and passes the message through unchanged. A file logger opens on start and closes on stop.
output | stdout (default), stderr, or a file path. |
format | text (default) or json. |
level | debug / info (default) / warn / error. |
addSource | Include source file:line. Default false. |
log blockmessage | CEL expression for the line. Sees body, vars, eventID, correlationID. |
level | Emission level for this block. |
logger | Connector name (empty = process default). |
full | Attach the whole message as structured attributes. |
ai-mapping · ai-router · ai-agent · ai-retry
Three providers behind one provider-agnostic interface: llm-anthropic, llm-openai, and llm-gemini. Any of them backs the AI blocks. The settings are identical across providers; only the default model differs.
| Setting | Notes |
|---|---|
apiKey | Required. Use ${ANTHROPIC_API_KEY} etc. |
model | Defaults per provider: Anthropic claude-sonnet-4-6, OpenAI gpt-5.4, Gemini gemini-3.5-flash. |
maxTokens | Response cap (Anthropic default 4096; others use the model default). |
baseURL | Endpoint override (proxies, Azure, testing). |
connectors:
- name: claude
type: llm-anthropic
settings:
apiKey: ${ANTHROPIC_API_KEY}
# model defaults to claude-sonnet-4-6
ai-mappingReshape the body to a target shape from a prompt, optional examples, and an optional output JSON Schema. On a schema-validation failure it errors — so it composes with ai-retry / handle-errors.
ai-routerThe model picks one of several named, described routes per message. A guardrail describes when to fall through to the default path.
ai-agentThe model accomplishes a task by calling your sub-flows as tools in a loop, up to maxIterations, then returns a result.
ai-retryRuns a protected chain; on failure the model inspects vars.error, revises the message, and retries up to maxAttempts before falling to the error chain.