Skip to content

Fetch request body as ReadableStream #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
ptrdom committed Nov 12, 2021
commit 1620172be49b412d21a3cc03b010310b81b93c4a
12 changes: 6 additions & 6 deletions api-reports/2_12.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15625,6 +15625,8 @@ PushSubscriptionJSON[JT] val expirationTime: java.lang.Double
PushSubscriptionJSON[JT] val keys: js.Dictionary[String]
PushSubscriptionOptions[JT] var applicationServerKey: js.UndefOr[Uint8Array]
PushSubscriptionOptions[JT] var userVisibleOnly: js.UndefOr[Boolean]
QueuingStrategy[JT] var highWaterMark: Int
QueuingStrategy[JT] var size: js.Function1[T, Int]
RTCBundlePolicy[JT]
RTCBundlePolicy[SO] val balanced: RTCBundlePolicy
RTCBundlePolicy[SO] val `max-bundle` = "max-bundle".asInstanceOf[RTCBundlePolicy]
Expand Down Expand Up @@ -15851,13 +15853,11 @@ ReadableStream[JT] def locked: Boolean
ReadableStream[JT] def pipeThrough[U](pair: Any, options: Any?): ReadableStream[U]
ReadableStream[JT] def pipeTo(dest: WriteableStream[T], options: Any?): Unit
ReadableStream[JT] def tee(): js.Array[_ <: ReadableStream[T]]
ReadableStream[SO] def apply[T](underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]], queuingStrategy: js.UndefOr[ReadableStreamQueuingStrategy[T]]?): ReadableStream[T]
ReadableStream[SO] def apply[T](underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]]?, queuingStrategy: js.UndefOr[QueuingStrategy[T]]?): ReadableStream[T]
ReadableStreamController[JC] def close(): Unit
ReadableStreamController[JC] def desiredSize: Int
ReadableStreamController[JC] def enqueue(chunk: T): js.UndefOr[Int]
ReadableStreamController[JC] def error(e: Any): Unit
ReadableStreamQueuingStrategy[JT] var highWaterMark: Double
ReadableStreamQueuingStrategy[JT] var size: js.Function1[Chunk[T], Unit]
ReadableStreamReader[JC] def cancel(): js.Promise[Unit]
ReadableStreamReader[JC] def cancel[U](reason: U): js.Promise[U]
ReadableStreamReader[JC] def closed: js.Promise[ReadableStreamReader[T]]
Expand All @@ -15866,9 +15866,9 @@ ReadableStreamReader[JC] def releaseLock(): Unit
ReadableStreamType[JT]
ReadableStreamType[SO] val bytes: ReadableStreamType
ReadableStreamUnderlyingSource[JT] var autoAllocateChunkSize: js.UndefOr[Double]
ReadableStreamUnderlyingSource[JT] var cancel: js.UndefOr[js.Function1[js.Any, Unit | js.Promise[Unit]]]
ReadableStreamUnderlyingSource[JT] var pull: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]]
ReadableStreamUnderlyingSource[JT] var start: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]]
ReadableStreamUnderlyingSource[JT] var cancel: js.UndefOr[js.Function1[js.Any, js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var pull: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var start: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var `type`: js.UndefOr[ReadableStreamType]
ReferrerPolicy[JT]
ReferrerPolicy[SO] val empty: ReferrerPolicy
Expand Down
12 changes: 6 additions & 6 deletions api-reports/2_13.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15625,6 +15625,8 @@ PushSubscriptionJSON[JT] val expirationTime: java.lang.Double
PushSubscriptionJSON[JT] val keys: js.Dictionary[String]
PushSubscriptionOptions[JT] var applicationServerKey: js.UndefOr[Uint8Array]
PushSubscriptionOptions[JT] var userVisibleOnly: js.UndefOr[Boolean]
QueuingStrategy[JT] var highWaterMark: Int
QueuingStrategy[JT] var size: js.Function1[T, Int]
RTCBundlePolicy[JT]
RTCBundlePolicy[SO] val balanced: RTCBundlePolicy
RTCBundlePolicy[SO] val `max-bundle` = "max-bundle".asInstanceOf[RTCBundlePolicy]
Expand Down Expand Up @@ -15851,13 +15853,11 @@ ReadableStream[JT] def locked: Boolean
ReadableStream[JT] def pipeThrough[U](pair: Any, options: Any?): ReadableStream[U]
ReadableStream[JT] def pipeTo(dest: WriteableStream[T], options: Any?): Unit
ReadableStream[JT] def tee(): js.Array[_ <: ReadableStream[T]]
ReadableStream[SO] def apply[T](underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]], queuingStrategy: js.UndefOr[ReadableStreamQueuingStrategy[T]]?): ReadableStream[T]
ReadableStream[SO] def apply[T](underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]]?, queuingStrategy: js.UndefOr[QueuingStrategy[T]]?): ReadableStream[T]
ReadableStreamController[JC] def close(): Unit
ReadableStreamController[JC] def desiredSize: Int
ReadableStreamController[JC] def enqueue(chunk: T): js.UndefOr[Int]
ReadableStreamController[JC] def error(e: Any): Unit
ReadableStreamQueuingStrategy[JT] var highWaterMark: Double
ReadableStreamQueuingStrategy[JT] var size: js.Function1[Chunk[T], Unit]
ReadableStreamReader[JC] def cancel(): js.Promise[Unit]
ReadableStreamReader[JC] def cancel[U](reason: U): js.Promise[U]
ReadableStreamReader[JC] def closed: js.Promise[ReadableStreamReader[T]]
Expand All @@ -15866,9 +15866,9 @@ ReadableStreamReader[JC] def releaseLock(): Unit
ReadableStreamType[JT]
ReadableStreamType[SO] val bytes: ReadableStreamType
ReadableStreamUnderlyingSource[JT] var autoAllocateChunkSize: js.UndefOr[Double]
ReadableStreamUnderlyingSource[JT] var cancel: js.UndefOr[js.Function1[js.Any, Unit | js.Promise[Unit]]]
ReadableStreamUnderlyingSource[JT] var pull: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]]
ReadableStreamUnderlyingSource[JT] var start: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]]
ReadableStreamUnderlyingSource[JT] var cancel: js.UndefOr[js.Function1[js.Any, js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var pull: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var start: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var `type`: js.UndefOr[ReadableStreamType]
ReferrerPolicy[JT]
ReferrerPolicy[SO] val empty: ReferrerPolicy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import scala.scalajs.js
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait ReadableStreamQueuingStrategy[T] extends js.Object {
trait QueuingStrategy[T] extends js.Object {

/** A non-negative number indicating the high water mark of the stream using this queuing strategy. */
var highWaterMark: Double
var highWaterMark: Int

/** (non-byte streams only)
*
Expand All @@ -19,5 +19,5 @@ trait ReadableStreamQueuingStrategy[T] extends js.Object {
*
* A function that computes and returns the finite non-negative size of the given chunk value.
*/
var size: js.Function1[Chunk[T], Unit]
var size: js.Function1[T, Int]
}
4 changes: 2 additions & 2 deletions dom/src/main/scala/org/scalajs/dom/ReadableStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ trait ReadableStream[+T] extends js.Object {
object ReadableStream {

def apply[T](
underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]],
queuingStrategy: js.UndefOr[ReadableStreamQueuingStrategy[T]] = js.undefined
underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]] = js.undefined,
queuingStrategy: js.UndefOr[QueuingStrategy[T]] = js.undefined
): ReadableStream[T] = {
js.Dynamic
.newInstance(js.Dynamic.global.selectDynamic("ReadableStream"))(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import scala.scalajs.js.annotation.JSGlobal
*/
@js.native
@JSGlobal
class ReadableStreamController[-T] private[this] (stream: ReadableStream[T] = null) extends js.Object {
class ReadableStreamController[-T] private[this] () extends js.Object {

/** The desiredSize getter returns the desired size to fill the controlled stream’s internal queue. It can be
* negative, if the queue is over-full. An underlying source should use this information to determine when and how to
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.|

/** See [[https://streams.spec.whatwg.org/#underlying-source-api ¶4.2.3. The underlying source API]] of whatwg streams
* spec.
Expand All @@ -16,7 +15,7 @@ trait ReadableStreamUnderlyingSource[T] extends js.Object {
* If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise
* will error the stream. Any thrown exceptions will be re-thrown by the [[ReadableStream]] constructor.
*/
var start: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]] = js.undefined
var start: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]] = js.undefined

/** A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the
* queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high
Expand All @@ -31,7 +30,7 @@ trait ReadableStreamUnderlyingSource[T] extends js.Object {
* returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a
* rejected promise.
*/
var pull: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]] = js.undefined
var pull: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]] = js.undefined

/** A function that is called whenever the consumer cancels the stream, via [[ReadableStream.cancel]] or
* [[ReadableStreamReader.cancel():scala\.scalajs\.js\.Promise[Unit]*]]. It takes as its argument the same value as
Expand All @@ -40,7 +39,7 @@ trait ReadableStreamUnderlyingSource[T] extends js.Object {
* called. Additionally, a rejected promise will error the stream, instead of letting it close. Throwing an exception
* is treated the same as returning a rejected promise.
*/
var cancel: js.UndefOr[js.Function1[js.Any, Unit | js.Promise[Unit]]] = js.undefined
var cancel: js.UndefOr[js.Function1[js.Any, js.UndefOr[js.Promise[Unit]]]] = js.undefined

/** Can be set to "bytes" to signal that the constructed [[ReadableStream]] is a readable byte stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.scalajs.dom.tests.shared

import org.junit.Assert.assertEquals
import org.junit.Test
import org.scalajs.dom.QueuingStrategy
import org.scalajs.dom.ReadableStream
import org.scalajs.dom.ReadableStreamController
import org.scalajs.dom.ReadableStreamUnderlyingSource
Expand All @@ -12,12 +13,11 @@ import org.scalajs.dom.tests.shared.AsyncTesting.async
import scala.concurrent.Future
import scala.scalajs.js
import scala.scalajs.js.Thenable.Implicits._
import scala.scalajs.js.|

trait BrowserTests {

@Test
final def ReadableStreamTest: AsyncResult = async {
final def ReadableStreamConstructionAndConsumptionTest: AsyncResult = async {
case class Tuna(color: String)

val expectedTunas = Seq(
Expand All @@ -30,8 +30,8 @@ trait BrowserTests {
start = js.defined({ (controller: ReadableStreamController[Tuna]) =>
controller.enqueue(Tuna("blue"))
controller.enqueue(Tuna("red"))
controller.close(): Unit | js.Promise[Unit]
}): js.UndefOr[js.Function1[ReadableStreamController[Tuna], Unit | js.Promise[Unit]]]
controller.close(): js.UndefOr[js.Promise[Unit]]
}): js.UndefOr[js.Function1[ReadableStreamController[Tuna], js.UndefOr[js.Promise[Unit]]]]
}
)

Expand All @@ -53,4 +53,46 @@ trait BrowserTests {
assertEquals(receivedTunas, expectedTunas)
}
}

@Test
final def ReadableStreamQueueingStrategyTest: AsyncResult = async {
val expectedStrings = Seq(
"short one",
"definitely a longer one"
)

val stream = ReadableStream[String](
new ReadableStreamUnderlyingSource[String] {
start = js.defined({ (controller: ReadableStreamController[String]) =>
controller.enqueue("short one")
controller.enqueue("definitely a longer one")
controller.close(): js.UndefOr[js.Promise[Unit]]
}): js.UndefOr[js.Function1[ReadableStreamController[String], js.UndefOr[js.Promise[Unit]]]]
},
new QueuingStrategy[String] {
var highWaterMark = 1
var size: js.Function1[String, Int] = { (chunk: String) =>
chunk.length
}
}
)

val reader = stream.getReader()

def read(strings: Seq[String]): Future[Seq[String]] = {
reader
.read()
.flatMap { chunk =>
if (chunk.done) {
Future.successful(strings)
} else {
read(strings :+ chunk.value)
}
}
}
read(Seq.empty)
.map { receivedStrings =>
assertEquals(receivedStrings, expectedStrings)
}
}
}