Use ReadableStream with Durable Object and Workers
Stream ReadableStream from Durable Objects.
This example demonstrates:
- A Worker receives a request, and forwards it to a Durable Object
my-id
. - The Durable Object streams an incrementing number every second, until it receives
AbortSignal
. - The Worker reads and logs the values from the stream.
- The Worker then cancels the stream after 5 values.
import { DurableObject } from "cloudflare:workers";
// Send incremented counter value every secondasync function* dataSource(signal) { let counter = 0; while (!signal.aborted) { yield counter++; await new Promise((resolve) => setTimeout(resolve, 1_000)); }
console.log("Data source cancelled");}
export class MyDurableObject extends DurableObject { async fetch(request) { const abortController = new AbortController();
const stream = new ReadableStream({ async start(controller) { if (request.signal.aborted) { controller.close(); abortController.abort(); return; }
for await (const value of dataSource(abortController.signal)) { controller.enqueue(new TextEncoder().encode(String(value))); } }, cancel() { console.log("Stream cancelled"); abortController.abort(); }, });
const headers = new Headers({ "Content-Type": "application/octet-stream", });
return new Response(stream, { headers }); }}
export default { async fetch(request, env, ctx) { const id = env.MY_DURABLE_OBJECT.idFromName("my-id"); const stub = env.MY_DURABLE_OBJECT.get(id); const response = await stub.fetch(request, { ...request }); if (!response.ok || !response.body) { return new Response("Invalid response", { status: 500 }); }
const reader = response.body .pipeThrough(new TextDecoderStream()) .getReader();
let data = []; let i = 0; while (true) { // Cancel the stream after 5 messages if (i > 5) { reader.cancel(); break; } const { value, done } = await reader.read();
if (value) { console.log(`Got value ${value}`); data = [...data, value]; }
if (done) { break; } i++; }
return Response.json(data); },};
import { DurableObject } from 'cloudflare:workers';
// Send incremented counter value every secondasync function* dataSource(signal: AbortSignal) { let counter = 0; while (!signal.aborted) { yield counter++; await new Promise((resolve) => setTimeout(resolve, 1_000)); }
console.log('Data source cancelled');}
export class MyDurableObject extends DurableObject<Env> { async fetch(request: Request): Promise<Response> { const abortController = new AbortController();
const stream = new ReadableStream({ async start(controller) { if (request.signal.aborted) { controller.close(); abortController.abort(); return; }
for await (const value of dataSource(abortController.signal)) { controller.enqueue(new TextEncoder().encode(String(value))); } }, cancel() { console.log('Stream cancelled'); abortController.abort(); }, });
const headers = new Headers({ 'Content-Type': 'application/octet-stream', });
return new Response(stream, { headers }); }}
export default { async fetch(request, env, ctx): Promise<Response> { const id: DurableObjectId = env.MY_DURABLE_OBJECT.idFromName('my-id'); const stub = env.MY_DURABLE_OBJECT.get(id); const response = await stub.fetch(request, { ...request }); if (!response.ok || !response.body) { return new Response('Invalid response', { status: 500 }); }
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
let data = [] as string[]; let i = 0; while (true) { // Cancel the stream after 5 messages if (i > 5) { reader.cancel(); break; } const { value, done } = await reader.read();
if (value) { console.log(`Got value ${value}`); data = [...data, value]; }
if (done) { break; } i++; }
return Response.json(data); },} satisfies ExportedHandler<Env>;
Was this helpful?
- Resources
- API
- New to Cloudflare?
- Products
- Sponsorships
- Open Source
- Support
- Help Center
- System Status
- Compliance
- GDPR
- Company
- cloudflare.com
- Our team
- Careers
- © 2025 Cloudflare, Inc.
- Privacy Policy
- Terms of Use
- Report Security Issues
- Trademark
-