Skip to main content
You're viewing v3 documentation

This is the v3 HyperIndex documentation. Still on an older version? Open the v2 documentation and consider migrating to v3.

Redis Streams

Push decoded events into a Redis Stream with XADD. Pair with consumer groups on the read side for at-least-once processing.

Installation

pnpm add ioredis

Configure the client

src/clients/redis.ts
import Redis from "ioredis";

export const redis = new Redis(process.env.REDIS_CONNECTION_URI!);

Define the effect

src/effects/redis.ts
import { createEffect, S } from "envio";
import { redis } from "../clients/redis";

export const xaddRedis = createEffect(
{
name: "xaddRedis",
input: {
stream: S.string,
payload: S.string, // JSON-encoded
},
rateLimit: { calls: 500, per: "second" },
mode: "unorderedAfterCommit",
},
async ({ input }) => {
await redis.xadd(input.stream, "*", "data", input.payload);
}
);

Call it from a handler

The rindexer config…

streams:
redis:
streams:
- stream_name: "ethereum_rocketpool_transfer_stream"
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000 && value <=4000000000000000000"
- "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"

…becomes:

src/EventHandlers.ts
import { RocketPoolETH } from "generated";
import { xaddRedis } from "./effects/redis";

const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
const MIN = 2_000_000_000_000_000_000n;
const MAX = 4_000_000_000_000_000_000n;

RocketPoolETH.Transfer.handler(async ({ event, context }) => {
const { from, to, value } = event.params;

if (from.toLowerCase() === WHALE && value >= MIN && value <= MAX) {
context.effect(xaddRedis, {
stream: "ethereum_rocketpool_transfer_stream",
payload: JSON.stringify({
from,
to,
value: value.toString(),
txHash: event.transaction.hash,
blockNumber: event.block.number,
}),
});
}
});

Redis Streams already give you total ordering per stream (the * ID is monotonically increasing), so unorderedAfterCommit is fine here — concurrent XADDs within a batch will simply interleave by arrival time. Use orderedAfterCommit only if downstream consumers expect strict handler-order across the batch.