Cloudflare Queues
Cloudflare Queues exposes a REST API for sending messages, so no SDK is required — just fetch.
Installation
Nothing to install.
Define the effect
src/effects/cloudflare.ts
import { createEffect, S } from "envio";
const ENDPOINT = (queueId: string) =>
`https://api.cloudflare.com/client/v4/accounts/${process.env.CLOUDFLARE_ACCOUNT_ID}/queues/${queueId}/messages`;
export const sendToCloudflareQueue = createEffect(
{
name: "sendToCloudflareQueue",
input: {
queueId: S.string,
body: S.string, // JSON-encoded
},
rateLimit: { calls: 100, per: "second" },
mode: "unorderedAfterCommit",
},
async ({ input, context }) => {
const res = await fetch(ENDPOINT(input.queueId), {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.CLOUDFLARE_API_TOKEN}`,
},
body: JSON.stringify({ body: input.body, content_type: "json" }),
});
if (!res.ok) {
context.log.error(`CF Queues failed: ${res.status} ${await res.text()}`);
throw new Error(`CF Queues ${res.status}`);
}
}
);
Call it from a handler
The rindexer config…
streams:
cloudflare_queues:
queues:
- queue_id: blockchain-transfers
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000"
…becomes:
src/EventHandlers.ts
import { RocketPoolETH } from "generated";
import { sendToCloudflareQueue } from "./effects/cloudflare";
const MIN = 2_000_000_000_000_000_000n;
RocketPoolETH.Transfer.handler(async ({ event, context }) => {
const { from, to, value } = event.params;
if (value >= MIN) {
context.effect(sendToCloudflareQueue, {
queueId: "blockchain-transfers",
body: JSON.stringify({
from,
to,
value: value.toString(),
txHash: event.transaction.hash,
chainId: event.chainId,
}),
});
}
});
Use orderedAfterCommit if your queue consumer relies on per-batch ordering.