ReactPHP PromiseStream: From Promise To Stream And Vice Versa
ReactPHP PromiseStream Component is a link between promise-land and stream-land
From Stream To Promise
One of the patterns that are used to deal with streams is spooling: we need the entire resource data available before we start processing it. One approach is to collect each chunk of data received from the stream:
But, imagine that we have some client code that wants to process some data from a file. It doesn’t care about the streams, it only needs to receive the entire data from the file. With this approach, this code should be called inside the callback for the end
event of the stream. So, the client code should now about streams, events, and callbacks. But sometimes it’s impossible. Consider this pretty simple example:
We have two separate classes:
Processor
for processing data from the fileProvider
for collecting this data from the file
Once the data is completely collected we need to pass it to the Processor
, but how? How can we encapsulate this stream logic and provide the client code only with the data from the stream? The answer is - promises.
We can hide spooling logic behind the promise. When all data is being read from the file we can resolve the promise with this data.
buffer()
\React\Promise\Stream\buffer()
function creates a promise which resolves with the stream data. All data chunks from the stream will be concatenated and once the stream closes the promise resolves. Now, let’s rewrite the previous example:
Now, we can easily pass the data from the stream via promises. The Processor
knows nothing about the source of the data. Also, the Provider
only cares about collecting the entire resource data. The promise becomes a clue between these two classes. Promises also allow to create chains of callbacks (like we did with pipe()
method for streams), when every next promise receives the data from the previous one:
In the example above, we collect the data from the stream, then return a promise that resolves with this data. Then this data is trimmed, then we replace all spaces with dashes and lowercase all characters. And at last, we output this data.
all()
If you need to deal with chunks of data and not with a concatenated content you can use \React\Promise\Stream\all();
function. It accepts an instance of the stream (readable or writable) and a name of the event. When a specified event occurs the function collects its data. The promise resolves with an array of whatever all events emitted or null
if the events didn’t pass any data.
By default, it collects data chunks from the data
event, but you can manually specify the name of the event you are interested in as the second argument: all($stream, 'event-name')
.
In case of chunks, the previous example looks the following:
The Provider
returns a stream wrapped in a promise with all()
function. Then in Processor
, this promise resolves with an array of data chunks from the stream.
Resolving and rejection
- The promise will resolve with an array once the stream closes.
- If the stream is already closed the promise resolves with an empty array.
- If the stream emits an error the promise rejects.
first()
Let’s update our Provider
, so it can return both data and error from the stream. To return error we can use
\React\Promise\Stream\first()
function. It creates a Promise which resolves once the given event triggers for the first time. Once error
event is emitted the stream closes, so there will be no more error
events. That’s why first()
function is exactly what we need in this way:
Now Provider
has two simple methods that both return promises. getData()
returns a promise which resolves with contents from the stream. getError()
resolves with an exception when an error occurs. Then we can simply pass the promise from getError()
method to a logger:
first()
function returns a promise which rejects if:
- the stream emits an
error
- unless you’re waiting for the error event, in which case it will resolve. - the stream closes - unless you’re waiting for the close event, in which case it will resolve.
- the stream is already closed.
- it is canceled.
From Promise To Stream
unwrapReadable()
When a promise resolves with a stream we can extract it or unwrap this stream. For readable streams we can use \React\Promise\Stream\unwrapReadable()
function. This function returns an instance of a stream that implements ReadableStreamInterface
. Consider this stream as a proxy for the future promise resolution:
In the example above when a readable stream receives data from the standard input (console), this data is piped to the resulting stream:
If the promise is either rejected or fulfilled with anything but an instance of ReadableStreamInterface
, then the resulting stream will emit an error
event and close:
To receive the error
event the promise should be pending when calling unwrapReadable()
function. If the promise is already settled and does not resolve with an instance of ReadableStreamInterface
, no events will be emitted. For example, this code doesn’t emit error
event:
You can close()
the resulting stream at any time. As a result, this will cancel()
the pending promise and also close()
the underlying stream.
unwrapWritable()
This function can be used to unwrap a promise which resolves with a WritableStreamInterface
. It returns an instance of the WritableStreamInterface
which acts as a proxy for the future promise resolution. Any data you wrote to this resulting stream will be piped to an internal writable stream:
In this simple example, the promise resolves with an instance of a writable stream, which writes data to the console. Then we create a proxy for this stream. When we write
data to the proxy it is also written to the unwrapped stream:
unwrapWritable()
follows the same rules as unwrapReadable()
does:
- If the promise is either rejected or fulfilled with anything but an instance of
WritableStreamInterface
, then the resulting stream will emit anerror
event and close. - To receive the
error
event the promise should be pending when callingunwrapWritable()
function. - You can
close()
the resulting stream at any time. And this willcancel()
the pending promise and alsoclose()
the underlying stream.
You can find examples from this article on GitHub.
This article is a part of the ReactPHP Series.
Learning Event-Driven PHP With ReactPHP
The book about asynchronous PHP that you NEED!
A complete guide to writing asynchronous applications with ReactPHP. Discover event-driven architecture and non-blocking I/O with PHP!
Minimum price: 5.99$