stream: experimental stream/iter implementation#62066
stream: experimental stream/iter implementation#62066jasnell wants to merge 42 commits intonodejs:mainfrom
Conversation
|
Review requested:
|
ronag
left a comment
There was a problem hiding this comment.
Super impressed! This is amazing.
One note. Since this is supposed to be "web compatible" it looks to me like everything is based on Uint8Array which is a bit unfortunate for Node. Could the node implementation use Buffer it would still be compatible it's just that we can access the Buffer prototype methods without doing hacks like Buffer.prototype.write.call(...).
|
Also could you do some mitata based benchmarks so that we can see the gc and memory pressure relative to node streams? |
|
Another thing, in the async generator case, can we pass an optional AbortSignal? i.e. |
This makes me a bit nervous for code portability. If some one starts working with this in node.js, they would end up writing code that depends on the values being |
benjamingr
left a comment
There was a problem hiding this comment.
just to explore implementation feasibility, performance, etc
Sounds fine as this isn't exposed outside at the time
|
|
||
| // Buffer is full | ||
| switch (this._backpressure) { | ||
| case 'strict': |
There was a problem hiding this comment.
I'm not sure strict should be the default and not block here.
There was a problem hiding this comment.
That'll be a big part of the discussion around this. A big part of the challenge with web streams is that backpressure can be fully ignored. One of the design principles for this new approach is to apply it strictly by default. We'll need to debate this. Recommend opening an issue at https://github.com/jasnell/new-streams
benjamingr
left a comment
There was a problem hiding this comment.
sorry meant to approve, regardless of design changes/suggestions regarding timing and a lot of other stuff as experimental this is fine.
I would maybe update the docs to emphasize the experimentality even further than normal
|
@ronag ... implemented a couple of mitata benchmarks in the -- Memory Benchmark ResultsEnvironment: Node 25.6.0, Intel Xeon w9-3575X, --expose-gc, mitata with .gc('inner') Per-Operation Allocations (New Streams vs Web Streams)
Pipeline scenarios (pull, pipeTo) show the biggest gains: 16-25x less heap because transforms are inline function calls, not stream-to-stream pipes with internal queues. Push is faster but uses slightly more heap due to batch iteration (Uint8Array[]). Broadcast/tee are comparable at this scale. Sustained Load (97.7 MB volume)
pipeTo and broadcast show the largest sustained-load heap difference. Web Streams' pipeThrough chain buffers ~50% of total volume in flight; new streams' pipeTo pulls synchronously through the transform. Broadcast's shared ring buffer (0.5 MB) vs tee's per-branch queues (42.8 MB). Zero retained memory for both APIs after completion -- no leaks. |
|
@ronag passing a signal to an async generator allows the underlying source to abort it, but we're lacking a builtin way for the consumer iterating the async generator to safely cancel the stream. It can Barring an improvement at the language level, the consumer can only safely cancel the underlying source if it has a reference to an WHATWG Streams don't have this problem if the consumer Happy to create examples to reproduce this if it's not clear what I'm talking about. |
|
I think you misunderstand. The signal would be for any async calls inside the generator. |
|
Yes, I'm just saying that doesn't allow the consumer to abort calls the async generator is making, but the consumer often decides when streaming should be aborted. For example say I'm using a library that handles subscriptions from the frontend. When it gets a subscription it asks me to build an async iterable of events to stream back. Then it's responsible for iterating, then cancelling once the frontend unsubscribes. If the iterable I pass to that library is from an async generator, I'll have to also pass an AbortController to that library for it to safely clean up once the client unsubscribes. If all it has is an AsyncIterable interface, it may leak resources after the client unsubscribes. This is a fundamental weakness in using async generators for transformation and my longtime frustration with async iteration in general. In contrast, with WHATWG streams, when a consumer cancels its reader, the underlying source and any TransformStreams and get notified to clean up right away. |
|
@benjamingr was actually talking about the same thing I'm trying to resurrect awareness of in this old issue in the async-iteration proposal Note one of his comments: tc39/proposal-async-iteration#126 (comment) This was eight years ago but there hasn't been much improvement on this front, unfortunately. I'm really hoping I can get everyone to fully understand this pitfall and have a good plan for how to help people avoid it before getting too far along with this new proposed API. |
9f8af01 to
e1e1911
Compare
Refactors the cancelation per updates in the design doc
Invalid backpressure values like `'banana'` would fall through switch statements at write time with a confusing `ERR_INVALID_STATE` error about "`writeSync` should have handled non-strict policy". Add `validateBackpressure()` in utils.js and call it from the `PushQueue`, `BroadcastImpl`, `ShareImpl`, and `SyncShareImpl` constructors. Invalid values now throw `ERR_INVALID_ARG_VALUE` immediately at construction.
Optimize the stream/iter implementation based on benchmark analysis comparing classic streams, web streams, and stream/iter. - Eliminate `withFlushSignalAsync`/`withFlushSignalSync` generator wrappers from the stateless transform pipeline. Stateless transforms now handle their own flush (`null`) signal internally after the for-await loop, removing an entire async generator layer per pipeline. Stateful transforms retain the wrapper since their cost is dominated by the transform operation itself (compression, encryption, etc). - Hoist writer capability checks in `pipeTo`/`pipeToSync`. Property lookups for `writeSync`/`writevSync`/`endSync`/`failSync` are done once before the loop instead of per-chunk via optional chaining. Split signal/no-signal loops to avoid per-batch null checks. Added `writevSync` batch write support to `pipeToSync`. - Optimize `isUint8ArrayBatch` with single-element fast path and plain for loop. Replaces `ArrayPrototypeEvery` (function call per element) with direct indexed loop. Short-circuits on length 1 (most common from transforms) and checks first/last before iterating middle. - Make broadcast consumer `next()`/`return()`/`throw()` non-async. Returns `PromiseResolve()` directly on the fast path (data in buffer) instead of wrapping through async function machinery. Caches the done result. - `RingBuffer`: replace modulo with bitwise AND. Capacity is always a power of 2, so index computation uses & mask instead of % capacity. - `Broadcast`: incremental min-cursor tracking. Replaces O(N) full scan of all consumers on every `next()` call with a cached min cursor that is only recomputed when dirty (consumer at the minimum advances or detaches). Eliminates O(N^2) per-write-cycle scaling. - `Broadcast`: separate waiters list for `notifyConsumers`. Only iterates consumers with pending resolve callbacks instead of scanning all consumers on every write. - `concatBytes`: cache per-chunk byte lengths to avoid reading `byteLength` twice per chunk (once for total, once for offset advance). Remove dead `totalByteLength` function. Benchmark results (MB/s, higher is better): | Benchmark | classic | webstream | iter | iter-sync | iter vs classic | | ---------------- | ------- | --------- | ------ | --------- | --------------- | | Identity 1MB | 1,245 | 582 | **3,110** | 16,658 | 2.5x | | Identity 64MB | 31,410 | 14,980 | **33,894** | 62,111 | 1.1x | | Transform 1MB | 287 | 227 | **325** | 327 | 1.1x | | Transform 64MB | 595 | 605 | **605** | 573 | 1.0x | | Compression 1MB | **123** | 98 | 110 | -- | 0.9x | | Compression 64MB | **329** | 303 | 308 | -- | 0.9x | | pipeTo 1MB | 1,137 | 494 | **2,740** | 13,611 | 2.4x | | pipeTo 64MB | 22,081 | 15,377 | **30,036** | 60,976 | 1.4x | | Broadcast 1c 1MB | 1,365 | 521 | **1,991** | -- | 1.5x | | Broadcast 2c 1MB | 1,285 | 439 | **1,962** | -- | 1.5x | | Broadcast 4c 1MB | **1,217** | 322 | 750 | -- | 0.6x | | File read 16MB | 1,469 | 537 | **1,639** | -- | 1.1x | The creation benchmarks show the raw cost of constructing the various objects without any other activity. The `classic` Node.js streams are faster here simply because they do less work on actual creation. | Creation (ops/sec) | classic | webstream | iter | iter vs classic | | ------------------ | --------- | --------- | --------- | --------------- | | readable | 8,662,361 | 505,889 | 1,144,385 | 0.1x | | writable | 3,856,139 | 269,950 | 1,285,210 | 0.3x | | pair | 3,120,224 | 141,988 | 349,176 | 0.1x |
This comment was marked as outdated.
This comment was marked as outdated.
|
I've updated the implementation to address the remaining outstanding issues, round out tests, add benchmarks, fix bugs, etc. It's also now behind an experimental cli flag. This is ready for review. |
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #62066 +/- ##
==========================================
- Coverage 89.68% 89.50% -0.18%
==========================================
Files 676 688 +12
Lines 206575 211999 +5424
Branches 39549 40498 +949
==========================================
+ Hits 185262 189756 +4494
- Misses 13446 14355 +909
- Partials 7867 7888 +21
🚀 New features to boost your workflow:
|
Opening this for discussion. Not intending to land this yet. It adds an implementation of the "new streams" to core and adds support toFileHandlewith tests and benchmarks just to explore implementation feasibility, performance, etc.This is an implementation of the "new streams" API for Node.js along with an example integration with
FileHandle. This covers the core part of the implementation.The module is
stream/iter. It is gated behind the--experimental-stream-iterCLI flag.Benchmark results comparing Node.js streams, Web streams, and stream/iter (higher number is better)
It's worth noting that the performance of the
FileHandlebenchmarked added, that reads files, converts them to upper case and then compresses them, is on par with node.js streams and twice as fast as web streams. (tho... web streams are not perf optimized in any way so take that 2x with a grain of salt). The majority of the perf cost in the benchmark is due to compression overhead. Without the compression transform, the new stream can be up to 15% faster than reading the file with classic node.js streams.The main thing this shows is that the new streams impl can (a) perform reasonably and (b) sit comfortably alongside the existing impls without any backwards compat concerns.
Benchmark runs:
Opencode/Opus 4.6 were leveraged heavily in the process of creating this PR following a strict iterative jasnell-in-the-loop process.