Streams in Node.js Explained
How Node.js streams work, why they matter, and how to use them with simple code examples.
10 min0 views
Introduction
Imagine you ordered a huge pizza. You do not wait for the whole pizza to arrive before eating. You eat slice by slice as it comes out of the oven.
That is exactly how Node.js streams work. Instead of loading an entire file into memory and then processing it, streams let you work with data chunk by chunk as it arrives.
This guide explains every stream type in Node.js in plain English, with code examples you can copy and use right away.
Why This Matters
Almost everything in Node.js uses streams under the hood. HTTP requests, file reads, TCP sockets, and even process.stdin are all streams. When you ignore streams and load everything into memory at once, bad things happen:
- Memory spikes when reading large files
- Your server sends a slow first response to users
- The app crashes on large video or CSV files
- The event loop gets blocked waiting for the full payload
Streams fix this by processing data one piece at a time, so memory stays low no matter how big the file is.
Problem
The simple approach looks like this:
// Reads the entire 2 GB file into RAM before doing anything
const data = fs.readFileSync("huge.csv");
process(data);This works fine for small files. But on real data it causes:
- The Node process eating gigabytes of RAM
- Garbage collector running constantly, stalling the app
- Service crashing inside Docker containers with limited memory
- Users waiting for the last byte before seeing the first byte
The real problem is that you are forcing all the data to exist at once when it could just flow through.
Solution
Node.js has four types of streams. Each one has a specific job:
| Stream Type | What It Does | Real Example |
|---|---|---|
| Readable | Produces data for you to consume | fs.createReadStream, HTTP request body |
| Writable | Accepts data and writes it somewhere | fs.createWriteStream, HTTP response |
| Duplex | Both reads and writes at the same time | TCP socket, net.Socket |
| Transform | Reads data, changes it, then passes it on | zlib.createGzip, CSV parser |
Data flows like a pipeline:
Readable -> Transform -> Transform -> WritableMemory stays low because each chunk is handled and thrown away before the next one arrives.
Implementation
Readable Streams
A readable stream is a source of data. You can listen to it with for await...of, which is the simplest way.
Reading a large file chunk by chunk:
import { createReadStream } from "fs";
const stream = createReadStream("large.csv", { encoding: "utf8" });
for await (const chunk of stream) {
console.log("Got a chunk that is", chunk.length, "bytes");
}Building your own readable stream:
import { Readable } from "stream";
class CounterStream extends Readable {
private current = 0;
private max: number;
constructor(max: number) {
super();
this.max = max;
}
_read() {
if (this.current >= this.max) {
this.push(null); // null tells Node the stream is done
return;
}
this.push(`${this.current++}\n`);
}
}
const counter = new CounterStream(5);
counter.pipe(process.stdout);
// Prints: 0, 1, 2, 3, 4Writable Streams
A writable stream is a destination for data. You send it data with .write() and signal you are done with .end().
Writing to a file:
import { createWriteStream } from "fs";
const output = createWriteStream("output.txt");
output.write("Hello, ");
output.write("streams!\n");
output.end(); // flush and close the fileBuilding a custom writable that collects chunks:
import { Writable } from "stream";
class CollectorStream extends Writable {
private chunks: Buffer[] = [];
_write(chunk: Buffer, _encoding: string, callback: () => void) {
this.chunks.push(chunk);
callback(); // tell Node you are ready for the next chunk
}
getResult(): string {
return Buffer.concat(this.chunks).toString("utf8");
}
}Transform Streams
A transform stream sits in the middle of a pipeline. It receives a chunk, does something to it, and pushes a new chunk forward. Think of it as a filter or converter.
Convert text to uppercase:
import { Transform } from "stream";
class UpperCaseTransform extends Transform {
_transform(chunk: Buffer, _encoding: string, callback: () => void) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
process.stdin.pipe(new UpperCaseTransform()).pipe(process.stdout);Split a stream into individual lines:
import { Transform } from "stream";
class LineSplitter extends Transform {
private buffer = "";
_transform(chunk: Buffer, _encoding: string, callback: () => void) {
this.buffer += chunk.toString();
const lines = this.buffer.split("\n");
this.buffer = lines.pop() ?? ""; // save the incomplete last line
for (const line of lines) {
this.push(line);
}
callback();
}
_flush(callback: () => void) {
// called when source ends, push any leftover data
if (this.buffer) this.push(this.buffer);
callback();
}
}Duplex Streams
A duplex stream can both read and write at the same time. The two sides are independent. A TCP socket is the most common example.
import { Duplex } from "stream";
class EchoStream extends Duplex {
_read() {
// data gets pushed in from _write below
}
_write(chunk: Buffer, _encoding: string, callback: () => void) {
this.push(chunk); // echo the same data back to the readable side
callback();
}
}The pipeline Utility
You might have seen .pipe() used to connect streams. It works, but it has a big problem: if one stream throws an error, the others keep running and leak memory.
Always use pipeline from stream/promises instead:
import { pipeline } from "stream/promises";
import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
await pipeline(
createReadStream("input.txt"),
createGzip(),
createWriteStream("input.txt.gz")
);
console.log("Done! File is now compressed.");pipeline automatically cleans up every stream if anything goes wrong. It also returns a Promise so you can use await.
Streaming an HTTP Response
You can send a large file to the browser without ever loading the whole thing into memory:
import http from "http";
import { createReadStream } from "fs";
import { createGzip } from "zlib";
import { pipeline } from "stream/promises";
const server = http.createServer(async (req, res) => {
res.writeHead(200, {
"Content-Type": "text/plain",
"Content-Encoding": "gzip",
});
await pipeline(createReadStream("large.txt"), createGzip(), res);
});
server.listen(3000);The browser starts receiving data immediately. It does not wait for the whole file to be read first.
Object Mode
By default, streams pass around Buffer and string values. You can turn on objectMode: true to pass plain JavaScript objects instead. This is useful when streaming database rows or parsed records.
import { Transform } from "stream";
interface RawRow {
first_name: string;
last_name: string;
age: string;
}
interface User {
fullName: string;
age: number;
}
class NormalizeUser extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(row: RawRow, _enc: string, cb: () => void) {
this.push({
fullName: `${row.first_name} ${row.last_name}`,
age: parseInt(row.age, 10),
} satisfies User);
cb();
}
}Code Examples
Compress and encrypt a file in one go:
import { pipeline } from "stream/promises";
import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
import { createCipheriv, randomBytes } from "crypto";
const key = randomBytes(32);
const iv = randomBytes(16);
await pipeline(
createReadStream("data.json"),
createGzip(),
createCipheriv("aes-256-cbc", key, iv),
createWriteStream("data.json.gz.enc")
);Stream database rows directly to an HTTP response as NDJSON:
import { QueryStream } from "pg-query-stream";
import { Transform } from "stream";
const query = new QueryStream("SELECT * FROM events ORDER BY created_at");
const dbStream = client.query(query);
const toNDJSON = new Transform({
objectMode: true,
transform(row, _enc, cb) {
cb(null, JSON.stringify(row) + "\n");
},
});
await pipeline(dbStream, toNDJSON, res);Count lines in a 1 GB log file without loading it:
import { createReadStream } from "fs";
let lines = 0;
const stream = createReadStream("server.log");
for await (const chunk of stream) {
for (const byte of chunk as Buffer) {
if (byte === 0x0a) lines++; // 0x0a is the byte value for a newline character
}
}
console.log("Total lines:", lines);Backpressure
Backpressure is a built-in safety mechanism. It stops a fast data source from flooding a slow destination.
Here is what happens: when the writable's internal buffer is full, .write() returns false. At that point, the readable should pause and wait for the writable to catch up.
readable.on("data", (chunk) => {
const ok = writable.write(chunk);
if (!ok) {
readable.pause(); // stop sending data
writable.once("drain", () => {
readable.resume(); // start again once the buffer clears
});
}
});The good news: pipeline() handles all of this for you automatically. You only need to write the code above when building a fully custom consumer.
Common Mistakes
Not handling errors with pipe()
// Bad: if any stream errors, memory leaks silently
readable.pipe(transform).pipe(writable);
// Good: pipeline cleans everything up on error
await pipeline(readable, transform, writable);Calling callback twice inside _transform
// Bad: calling cb() twice crashes with "write after end"
_transform(chunk, enc, cb) {
this.push(process(chunk));
cb();
cb(); // this breaks everything
}Forgetting _flush on Transform streams
If your transform holds onto partial data between chunks (like a line splitter or JSON parser), you must implement _flush. It runs when the source stream ends and lets you push out anything still sitting in your buffer.
_flush(callback: () => void) {
if (this.buffer.length > 0) {
this.push(this.buffer);
}
callback();
}Mixing object mode and byte mode in one pipeline
If one stream uses objectMode: true and the next one does not, Node will throw an error immediately. All streams in a pipeline must agree on the mode.
Blocking the event loop inside stream callbacks
Never use readFileSync, execSync, or any other synchronous blocking call inside a stream handler. It cancels out all the benefits of streaming and stalls every other request on your server.
Best Practices
- Always use
pipelinefromstream/promisesinstead of.pipe()in production code - Use
for await...ofto consume readable streams, it is cleaner than attaching event listeners - Set
highWaterMarkto match your use case (default is 16 KB for byte streams, 16 objects for object mode) - Always implement
_flushwhen your transform holds state between chunks - Use
Transforminstead of a separateReadableplusWritablepair, it handles backpressure for you - Use object mode for structured data inside your app, use byte streams at the I/O boundary
- Call
stream.destroy(err)explicitly when you catch an error outside a pipeline
Conclusion
Streams are not a complex trick for advanced developers. They are just the natural way Node.js handles data that flows in over time.
Once you get comfortable with the four stream types, start using pipeline for everything, and learn when to reach for object mode, you will write Node.js code that is faster, cheaper to run, and easier to reason about.
Start small: replace one readFileSync with createReadStream. Add a createGzip transform. Connect it with pipeline. That is all it takes to begin.