Skip to content
Cloudflare Docs

Use ReadableStream with Durable Object and Workers

Stream ReadableStream from Durable Objects.

This example demonstrates:

  • A Worker receives a request, and forwards it to a Durable Object my-id.
  • The Durable Object streams an incrementing number every second, until it receives AbortSignal.
  • The Worker reads and logs the values from the stream.
  • The Worker then cancels the stream after 5 values.
import { DurableObject } from "cloudflare:workers";
// Send incremented counter value every second
async function* dataSource(signal) {
let counter = 0;
while (!signal.aborted) {
yield counter++;
await new Promise((resolve) => setTimeout(resolve, 1_000));
}
console.log("Data source cancelled");
}
export class MyDurableObject extends DurableObject {
async fetch(request) {
const abortController = new AbortController();
const stream = new ReadableStream({
async start(controller) {
if (request.signal.aborted) {
controller.close();
abortController.abort();
return;
}
for await (const value of dataSource(abortController.signal)) {
controller.enqueue(new TextEncoder().encode(String(value)));
}
},
cancel() {
console.log("Stream cancelled");
abortController.abort();
},
});
const headers = new Headers({
"Content-Type": "application/octet-stream",
});
return new Response(stream, { headers });
}
}
export default {
async fetch(request, env, ctx) {
const id = env.MY_DURABLE_OBJECT.idFromName("my-id");
const stub = env.MY_DURABLE_OBJECT.get(id);
const response = await stub.fetch(request, { ...request });
if (!response.ok || !response.body) {
return new Response("Invalid response", { status: 500 });
}
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let data = [];
let i = 0;
while (true) {
// Cancel the stream after 5 messages
if (i > 5) {
reader.cancel();
break;
}
const { value, done } = await reader.read();
if (value) {
console.log(`Got value ${value}`);
data = [...data, value];
}
if (done) {
break;
}
i++;
}
return Response.json(data);
},
};