diff --git a/CHANGELOG.md b/CHANGELOG.md index 56b1684..3dffce4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,21 @@ Bugfixes: Other improvements: +## [v5.0.0](https://github.com/purescript-node/purescript-node-streams-aff/releases/tag/v5.0.0) + +Breaking changes: + +- Readers return record instead of `Tuple`. (#11 by @jamesdbrock) + +New features: + +- New `Internal` functions: `onceClose`, `writable`, `newStreamPassThrough`. (#11 by @jamesdbrock) + +Bugfixes: + +- More cleanup for event handlers. (#11 by @jamesdbrock) +- Complete reads when stream is closed. (#11 by @jamesdbrock) + ## v4.0.1 Breaking changes: diff --git a/bower.json b/bower.json index 0996b24..79d4aff 100644 --- a/bower.json +++ b/bower.json @@ -26,6 +26,6 @@ "purescript-prelude": "^v6.0.1", "purescript-refs": "^v6.0.0", "purescript-st": "^v6.2.0", - "purescript-tuples": "^v7.0.0" + "purescript-tailrec": "^v6.1.0" } } diff --git a/packages.dhall b/packages.dhall index 9e56005..3c9268a 100644 --- a/packages.dhall +++ b/packages.dhall @@ -99,7 +99,7 @@ in upstream ------------------------------- -} let upstream = - https://github.com/purescript/package-sets/releases/download/psc-0.15.4-20221013/packages.dhall - sha256:21000b190e1ef14c92feb1400816022319bc40a30280d20f24c0dcacfb85e966 + https://github.com/purescript/package-sets/releases/download/psc-0.15.7-20230112/packages.dhall + sha256:3e5a439b9975949016eac07660ea2c80531ef0eb08903fcbeacc1d291a05cea0 in upstream diff --git a/spago-dev.dhall b/spago-dev.dhall index 0305dcf..1d1bedb 100644 --- a/spago-dev.dhall +++ b/spago-dev.dhall @@ -18,6 +18,8 @@ conf // , "node-process" , "node-fs" , "console" + , "foldable-traversable" + , "parallel" , "partial" , "unsafe-coerce" , "control" diff --git a/spago.dhall b/spago.dhall index 43bb5e4..ef9b739 100644 --- a/spago.dhall +++ b/spago.dhall @@ -12,22 +12,23 @@ to generate this file without the comments in this block. -} { name = "node-streams-aff" , dependencies = - [ "aff" - , "effect" - , "exceptions" - , "node-buffer" - , "node-streams" - , "nullable" - , "st" - , "refs" - , "arrays" - , "either" - , "maybe" - , "prelude" - , "tuples" - ] + [ "aff" + , "effect" + , "exceptions" + , "node-buffer" + , "node-streams" + , "nullable" + , "st" + , "refs" + , "arrays" + , "either" + , "maybe" + , "prelude" + , "tailrec" + ] , packages = ./packages.dhall , sources = [ "src/**/*.purs" ] , license = "MIT" -, repository = "https://github.com/purescript-node/purescript-node-streams-aff.git" +, repository = + "https://github.com/purescript-node/purescript-node-streams-aff.git" } diff --git a/src/Node/Stream/Aff.purs b/src/Node/Stream/Aff.purs index 42cbe3c..fc6d25d 100644 --- a/src/Node/Stream/Aff.purs +++ b/src/Node/Stream/Aff.purs @@ -29,36 +29,38 @@ -- | #### Result Buffers -- | -- | The result of a reading function may be chunked into more than one `Buffer`. --- | The `fst` element of the result `Tuple` is an `Array Buffer` of what +-- | The `buffers` element of the result is an `Array Buffer` of what -- | was read. -- | To concatenate the result into a single `Buffer`, use --- | `Node.Buffer.concat :: Array Buffer -> Buffer`. +-- | [`Node.Buffer.concat :: Array Buffer -> m Buffer`](https://pursuit.purescript.org/packages/purescript-node-buffer/docs/Node.Buffer#t:MutableBuffer). -- | -- | ``` --- | input :: Buffer <- liftEffect <<< concat <<< fst =<< readSome stdin +-- | input :: Buffer +-- | <- liftEffect <<< concat <<< _.buffers =<< readSome stdin -- | ``` -- | -- | To calculate the number of bytes read, use -- | `Node.Buffer.size :: Buffer -> m Int`. -- | -- | ``` --- | Tuple inputs _ :: Array Buffer <- readSome stdin +-- | {buffers} :: Array Buffer <- readSome stdin -- | bytesRead :: Int --- | <- liftEffect $ Array.foldM (\a b -> (a+_) <$> size b) 0 inputs +-- | <- liftEffect $ Array.foldM (\a b -> (a+_) <$> size b) 0 buffers -- | ``` -- | -- | #### Result `readagain` flag -- | --- | The `snd` element of the result `Tuple` is a `Boolean` flag which +-- | The `readagain` field of the result is a `Boolean` flag which -- | is `true` if the stream has not reached End-Of-File (and also if the stream -- | has not errored or been destroyed), so we know we can read again. --- | If the flag is `false` then +-- | If the flag is `false` then the stream is not `readable` -- | no more bytes will ever be produced by the stream. -- | -- | Reading from an ended, closed, errored, or destroyed stream --- | will complete immediately with `Tuple [] false`. +-- | will complete immediately with `{buffers:[], readagain:false}`. -- | --- | The `readagain` flag will give the same answer as a call to `Internal.readable`. +-- | The `readagain` flag will give the same answer as a +-- | subsequent call to `Internal.readable`. -- | -- | ## Writing -- | @@ -71,8 +73,12 @@ -- | function on each of the `Buffer`s, -- | and will asychronously wait if there is “backpressure” from the stream. -- | +-- | #### Result +-- | -- | The writing functions will complete after all the data is flushed to the -- | stream. +-- | +-- | If a write fails then it will `throwError` in the `Aff`. module Node.Stream.Aff ( readSome , readAll @@ -85,14 +91,14 @@ module Node.Stream.Aff import Prelude +import Control.Monad.Rec.Class (Step(..), tailRecM) import Control.Monad.ST.Class (liftST) import Data.Array as Array import Data.Array.ST as Array.ST import Data.Either (Either(..)) import Data.Maybe (Maybe(..)) -import Data.Tuple (Tuple(..)) import Effect (Effect, untilE) -import Effect.Aff (effectCanceler, makeAff, nonCanceler) +import Effect.Aff (effectCanceler, error, makeAff, nonCanceler) import Effect.Aff.Class (class MonadAff, liftAff) import Effect.Class (class MonadEffect, liftEffect) import Effect.Exception (catchException) @@ -102,7 +108,7 @@ import Node.Buffer as Buffer import Node.Encoding as Encoding import Node.Stream (Readable, Writable) import Node.Stream as Stream -import Node.Stream.Aff.Internal (onceDrain, onceEnd, onceError, onceReadable, readable) +import Node.Stream.Aff.Internal (onceClose, onceDrain, onceEnd, onceError, onceReadable, readable) -- | Wait until there is some data available from the stream, then read it. -- | @@ -112,94 +118,123 @@ readSome :: forall m r . MonadAff m => Readable r - -> m (Tuple (Array Buffer) Boolean) -readSome r = liftAff <<< makeAff $ \res -> do + -> m { buffers :: Array Buffer, readagain :: Boolean } +readSome r = liftAff <<< makeAff $ \complete -> do bufs <- liftST $ Array.ST.new - removeError <- onceError r $ res <<< Left + removeError <- onceError r \err -> complete (Left err) + + removeClose <- onceClose r do + -- Don't error, instead return whatever we've read. + removeError + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) removeEnd <- onceEnd r do removeError + removeClose ret <- liftST $ Array.ST.unsafeFreeze bufs - res (Right (Tuple ret false)) + complete (Right { buffers: ret, readagain: false }) - -- try to read right away. - catchException (res <<< Left) do + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + complete (Left err) + pure nonCanceler + + catchException cleanupRethrow do ifM (readable r) do + -- try to read right away. untilE do Stream.read r Nothing >>= case _ of Nothing -> pure true Just chunk -> do void $ liftST $ Array.ST.push chunk bufs pure false + + ret1 <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + if readagain && Array.length ret1 == 0 then do + -- if still readable and we couldn't read anything right away, + -- then wait for the readable event. + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + -- if not readable then this was a zero-length Readable stream. + -- https://nodejs.org/api/stream.html#event-readable + removeReadable <- onceReadable r do + untilE do + Stream.read r Nothing >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + ret2 <- liftST $ Array.ST.unsafeFreeze bufs + removeError + removeClose + removeEnd + readagain2 <- readable r + complete (Right { buffers: ret2, readagain: readagain2 }) + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + removeReadable + -- else return what we read right away + else do + removeError + removeClose + removeEnd + complete (Right { buffers: ret1, readagain }) + pure nonCanceler do removeError + removeClose removeEnd - res (Right (Tuple [] false)) - - ret1 <- liftST $ Array.ST.unsafeFreeze bufs - readagain <- readable r - removeReadable <- - if readagain && Array.length ret1 == 0 then do - -- if still readable and we couldn't read anything right away, - -- then wait for the readable event. - -- “The 'readable' event will also be emitted once the end of the - -- stream data has been reached but before the 'end' event is emitted.” - -- if not readable then this was a zero-length Readable stream. - -- https://nodejs.org/api/stream.html#event-readable - onceReadable r do - catchException (res <<< Left) do - untilE do - Stream.read r Nothing >>= case _ of - Nothing -> pure true - Just chunk -> do - void $ liftST $ Array.ST.push chunk bufs - pure false - ret2 <- liftST $ Array.ST.unsafeFreeze bufs - removeError - removeEnd - readagain2 <- readable r - res (Right (Tuple ret2 readagain2)) + complete (Right { buffers: [], readagain: false }) + pure nonCanceler - -- return what we read right away - else do - removeError - removeEnd - res (Right (Tuple ret1 readagain)) - pure (pure unit) -- dummy canceller - - -- canceller might by called while waiting for `onceReadable` - pure $ effectCanceler do - removeError - removeEnd - removeReadable - --- | Read all data until the end of the stream. +-- | Read all data until the end of the stream. After completion the stream +-- | will no longer be `readable`. -- | -- | Note that __stdin__ will never end. readAll :: forall m r . MonadAff m => Readable r - -> m (Tuple (Array Buffer) Boolean) -readAll r = liftAff <<< makeAff $ \res -> do + -> m (Array Buffer) +readAll r = liftAff <<< makeAff $ \complete -> do bufs <- liftST $ Array.ST.new removeReadable <- Ref.new (pure unit :: Effect Unit) - removeError <- onceError r $ res <<< Left + removeError <- onceError r \err -> do + join $ Ref.read removeReadable + complete (Left err) + + removeClose <- onceClose r do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable -- can 'close' be raised while waiting for 'readable'? Maybe? + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) removeEnd <- onceEnd r do removeError + removeClose ret <- liftST $ Array.ST.unsafeFreeze bufs - res (Right (Tuple ret false)) + complete (Right ret) let cleanupRethrow err = do removeError + removeClose removeEnd join $ Ref.read removeReadable - res (Left err) + complete (Left err) + pure nonCanceler -- try to read right away. catchException cleanupRethrow do @@ -211,34 +246,36 @@ readAll r = liftAff <<< makeAff $ \res -> do Just chunk -> do void $ liftST $ Array.ST.push chunk bufs pure false + + -- then wait for the stream to be readable until the stream has ended. + let + waitToRead = do + removeReadable' <- onceReadable r do + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + untilE do + Stream.read r Nothing >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + pure false + waitToRead -- this is not recursion + Ref.write removeReadable' removeReadable + + waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + do removeError + removeClose removeEnd - res (Right (Tuple [] false)) - - -- then wait for the stream to be readable until the stream has ended. - let - waitToRead = do - removeReadable' <- onceReadable r do - -- “The 'readable' event will also be emitted once the end of the - -- stream data has been reached but before the 'end' event is emitted.” - catchException cleanupRethrow do - untilE do - Stream.read r Nothing >>= case _ of - Nothing -> pure true - Just chunk -> do - _ <- liftST $ Array.ST.push chunk bufs - pure false - waitToRead -- this is not recursion - Ref.write removeReadable' removeReadable - - waitToRead - - -- canceller might by called while waiting for `onceReadable` - pure $ effectCanceler do - removeError - removeEnd - join $ Ref.read removeReadable + complete (Right []) + pure nonCanceler -- | Wait for *N* bytes to become available from the stream. -- | @@ -252,32 +289,43 @@ readN . MonadAff m => Readable r -> Int - -> m (Tuple (Array Buffer) Boolean) -readN r n = liftAff <<< makeAff $ \res -> do - redRef <- Ref.new 0 - bufs <- liftST $ Array.ST.new - removeReadable <- Ref.new (pure unit :: Effect Unit) - - -- TODO on error, we're not calling removeEnd... - removeError <- onceError r $ res <<< Left - - -- The `end` event is sometimes raised after we have read N bytes, even - -- if there are more bytes in the stream? - removeEnd <- onceEnd r do - removeError - ret <- liftST $ Array.ST.unsafeFreeze bufs - res (Right (Tuple ret false)) + -> m { buffers :: Array Buffer, readagain :: Boolean } +readN r n = liftAff <<< makeAff $ \complete -> + if n < 0 then complete (Left $ error "read bytes must be > 0") *> pure nonCanceler + else do + redRef <- Ref.new 0 + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) + + -- On error, we're not calling removeClose or removeEnd... maybe that's fine? + removeError <- onceError r \err -> do + join $ Ref.read removeReadable + complete (Left err) - let - cleanupRethrow err = do + removeClose <- onceClose r do + -- Don't error, instead return whatever we've read. removeError - removeEnd join $ Ref.read removeReadable - res (Left err) + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + removeEnd <- onceEnd r do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + complete (Left err) + pure nonCanceler - -- try to read N bytes and then either return N bytes or run a continuation - tryToRead continuation = do - catchException cleanupRethrow do + -- try to read N bytes and then either return N bytes or run a continuation + tryToRead continuation = do untilE do red <- Ref.read redRef -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size @@ -297,35 +345,38 @@ readN r n = liftAff <<< makeAff $ \res -> do red <- Ref.read redRef if red >= n then do removeError + removeClose removeEnd ret <- liftST $ Array.ST.unsafeFreeze bufs readagain <- readable r - res (Right (Tuple ret readagain)) + complete (Right { buffers: ret, readagain }) else continuation unit - -- try to read right away. - ifM (readable r) - do - tryToRead (\_ -> pure unit) - do - removeError - removeEnd - res (Right (Tuple [] false)) - - -- if there were not enough bytes right away, then wait for bytes to come in. - let - waitToRead _ = do - removeReadable' <- onceReadable r do - tryToRead waitToRead - Ref.write removeReadable' removeReadable - waitToRead unit - - -- canceller might by called while waiting for `onceReadable` - pure $ effectCanceler do - removeError - removeEnd - join $ Ref.read removeReadable + -- if there were not enough bytes right away, then wait for bytes to come in. + waitToRead _ = do + removeReadable' <- onceReadable r do + tryToRead waitToRead -- not recursion + Ref.write removeReadable' removeReadable + + catchException cleanupRethrow do + -- try to read right away. + ifM (readable r) + do + tryToRead waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + do + removeError + removeClose + removeEnd + -- If the stream is not readable should that be a fail? No. + complete (Right { buffers: [], readagain: false }) + pure nonCanceler -- | Write to a stream. -- | @@ -336,47 +387,36 @@ write => Writable w -> Array Buffer -> m Unit -write w bs = liftAff <<< makeAff $ \res -> do - bufs <- liftST $ Array.ST.thaw bs +write w bs = liftAff <<< makeAff $ \complete -> do removeDrain <- Ref.new (pure unit :: Effect Unit) - removeError <- onceError w $ res <<< Left - let - callback = case _ of - Just err -> res (Left err) - Nothing -> pure unit - - callbackLast = case _ of - Just err -> do - removeError - res (Left err) - Nothing -> do - removeError - res (Right unit) - - oneWrite = do - catchException (res <<< Left) do - untilE do - chunkMay <- liftST $ Array.ST.shift bufs - case chunkMay of + oneWrite i' = flip tailRecM i' \i -> do + case Array.index bs i of + Nothing -> do + complete (Right unit) + pure (Done unit) + Just b -> do + -- “write … calls the supplied callback once the data has been fully handled. + -- If an error occurs, the callback will be called with the error + -- as its first argument. The callback is called asynchronously and + -- before 'error' is emitted.” + nobackpressure <- Stream.write w b $ case _ of Nothing -> do - pure true - Just chunk -> do - isLast <- liftST $ (_ == 0) <$> Array.length <$> Array.ST.unsafeFreeze bufs - nobackpressure <- Stream.write w chunk (if isLast then callbackLast else callback) - if nobackpressure then do - pure false - else do - removeDrain' <- onceDrain w oneWrite - void $ Ref.write removeDrain' removeDrain - pure true - - oneWrite + pure unit + Just err -> do + complete (Left err) + + if nobackpressure then do + pure (Loop (i + 1)) + else do + removeDrain' <- onceDrain w (oneWrite (i + 1)) + Ref.write removeDrain' removeDrain + pure (Done unit) + oneWrite 0 -- canceller might be called while waiting for `onceDrain` pure $ effectCanceler do - removeError join $ Ref.read removeDrain -- | Signal that no more data will be written to the `Writable`. Will complete @@ -392,16 +432,28 @@ end . MonadAff m => Writable w -> m Unit -end w = liftAff <<< makeAff $ \res -> do +end w = liftAff <<< makeAff $ \complete -> do Stream.end w $ case _ of - Nothing -> res (Right unit) - Just err -> res (Left err) - pure $ nonCanceler + Nothing -> complete (Right unit) + Just err -> complete (Left err) + pure nonCanceler -- | Concatenate an `Array` of UTF-8 encoded `Buffer`s into a `String`. +-- | +-- | Example: +-- | +-- | ``` +-- | inputstring <- toStringUTF8 =<< readAll stream +-- | ``` toStringUTF8 :: forall m. MonadEffect m => Array Buffer -> m String toStringUTF8 bs = liftEffect $ Buffer.toString Encoding.UTF8 =<< Buffer.concat bs -- | Encode a `String` as an `Array` containing one UTF-8 encoded `Buffer`. +-- | +-- | Example: +-- | +-- | ``` +-- | write stream =<< fromStringUTF8 "outputstring" +-- | ``` fromStringUTF8 :: forall m. MonadEffect m => String -> m (Array Buffer) fromStringUTF8 s = liftEffect $ map pure $ Buffer.fromString s Encoding.UTF8 diff --git a/src/Node/Stream/Internal.js b/src/Node/Stream/Internal.js index 0f9d048..1123ab3 100644 --- a/src/Node/Stream/Internal.js +++ b/src/Node/Stream/Internal.js @@ -10,24 +10,39 @@ export const onceEnd = s => f => () => { return () => {s.removeListener('end', f);}; } +export const onceClose = s => f => () => { + s.once('close', f); + return () => {s.removeListener('close', f);}; +} + export const onceDrain = s => f => () => { s.once('drain', f); return () => {s.removeListener('drain', f);}; } +// https://nodejs.org/api/events.html#emitteronceeventname-listener export const onceError = s => f => () => { - s.once('error', error => f(error)()); - return () => {s.removeListener('error', f);}; + const listener = error => f(error)(); + s.once('error', listener); + return () => {s.removeListener('error', listener);}; } export const readable = s => () => { return s.readable; } +export const writable = s => () => { + return s.writable; +} + export const push = s => buf => () => { return s.push(buf); } export const newReadable = () => { return new stream.Readable(); -} \ No newline at end of file +} + +export const newStreamPassThroughImpl = () => { + return new stream.PassThrough(); +} diff --git a/src/Node/Stream/Internal.purs b/src/Node/Stream/Internal.purs index c79b2fc..a01b450 100644 --- a/src/Node/Stream/Internal.purs +++ b/src/Node/Stream/Internal.purs @@ -4,23 +4,27 @@ module Node.Stream.Aff.Internal ( onceDrain , onceEnd + , onceClose , onceError , onceReadable , readable + , writable , push , newReadable , newReadableStringUTF8 + , newStreamPassThrough ) where import Prelude import Data.Nullable (Nullable, notNull, null) import Effect (Effect) +import Effect.Class (class MonadEffect, liftEffect) import Effect.Exception (Error) import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding as Encoding -import Node.Stream (Readable, Stream, Writable) +import Node.Stream (Readable, Stream, Writable, Duplex) -- | Listen for one `readable` event, call the callback, then remove -- | the event listener. @@ -44,6 +48,17 @@ foreign import onceEnd -> Effect Unit -> Effect (Effect Unit) +-- | Listen for one `close` event, call the callback, then remove +-- | the event listener. +-- | +-- | Returns an effect for removing the event listener before the event +-- | is raised. +foreign import onceClose + :: forall s + . Stream s + -> Effect Unit + -> Effect (Effect Unit) + -- | Listen for one `drain` event, call the callback, then remove -- | the event listener. -- | @@ -76,6 +91,11 @@ foreign import readable . Readable r -> Effect Boolean +foreign import writable + :: forall w + . Writable w + -> Effect Boolean + -- | [`readable.push(chunk[, encoding])`](https://nodejs.org/api/stream.html#readablepushchunk-encoding) foreign import push :: forall r @@ -88,13 +108,23 @@ foreign import newReadable :: forall r . Effect (Readable r) --- | Construct a `Readable` from a `String`. +-- | Construct a UTF-8 `Readable` from a `String`. newReadableStringUTF8 - :: forall r - . String - -> Effect (Readable r) -newReadableStringUTF8 strng = do + :: forall r m + . MonadEffect m + => String + -> m (Readable r) +newReadableStringUTF8 strng = liftEffect do rstream <- newReadable _ <- push rstream =<< (notNull <$> Buffer.fromString strng Encoding.UTF8) _ <- push rstream null -- the end of the stream pure rstream + +-- | “A trivial implementation of a `Transform` stream that simply passes the +-- | input bytes across to the output.” +-- | +-- | [__Class: `stream.PassThrough`__](https://nodejs.org/api/stream.html#class-streampassthrough) +newStreamPassThrough :: forall m. MonadEffect m => m Duplex +newStreamPassThrough = liftEffect newStreamPassThroughImpl + +foreign import newStreamPassThroughImpl :: Effect Duplex diff --git a/test/Main.purs b/test/Main.purs index 77c72eb..cefe3f8 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -9,18 +9,20 @@ module Test.Main where import Prelude +import Control.Parallel (parSequence_) +import Data.Array ((..)) import Data.Array as Array +import Data.Foldable (for_) import Data.Maybe (Maybe(..)) -import Data.Tuple (Tuple(..), fst) import Effect (Effect) -import Effect.Aff (Milliseconds(..), launchAff_) +import Effect.Aff (Aff, Milliseconds(..), launchAff_) import Effect.Class (liftEffect) import Node.Buffer (Buffer, concat) import Node.Buffer as Buffer -import Node.Encoding (Encoding(..)) import Node.FS.Stream (createReadStream, createWriteStream) -import Node.Stream.Aff (end, readAll, readN, readSome, toStringUTF8, write) -import Node.Stream.Aff.Internal (newReadableStringUTF8) +import Node.Stream (destroy) +import Node.Stream.Aff (end, fromStringUTF8, readAll, readN, readSome, toStringUTF8, write) +import Node.Stream.Aff.Internal (newReadableStringUTF8, newStreamPassThrough) import Partial.Unsafe (unsafePartial) import Test.Spec (describe, it) import Test.Spec.Assertions (expectError, shouldEqual) @@ -32,38 +34,128 @@ main = unsafePartial $ do launchAff_ do runSpec' (defaultConfig { timeout = Just (Milliseconds 40000.0) }) [ consoleReporter ] do describe "Node.Stream.Aff" do - it "writes and reads" do + it "PassThrough" do + s <- newStreamPassThrough + _ <- write s =<< fromStringUTF8 "test" + end s + b1 <- toStringUTF8 =<< readAll s + shouldEqual b1 "test" + it "overflow PassThrough" do + s <- newStreamPassThrough + let magnitude = 10000 + [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" + parSequence_ + [ write s $ Array.replicate magnitude outstring + , void $ readSome s + ] + it "reads from a zero-length Readable" do + r <- newReadableStringUTF8 "" + -- readSome should return readagain false + shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome r + shouldEqual "" =<< toStringUTF8 =<< readAll r + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN r 0 + it "readN cleans up event handlers" do + s <- newReadableStringUTF8 "" + for_ (0 .. 100) \_ -> void $ readN s 0 + it "readSome cleans up event handlers" do + s <- newReadableStringUTF8 "" + for_ (0 .. 100) \_ -> void $ readSome s + it "readAll cleans up event handlers" do + s <- newReadableStringUTF8 "" + for_ (0 .. 100) \_ -> void $ readAll s + it "write cleans up event handlers" do + s <- newStreamPassThrough + [ b ] <- fromStringUTF8 "x" + for_ (0 .. 100) \_ -> void $ write s [ b ] + it "readSome from PassThrough" do + s <- newStreamPassThrough + write s =<< fromStringUTF8 "test" + end s + -- The first readSome readagain will be true, that's not good + shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + it "readSome from PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ do + shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s + -- This is rediculous behavior + shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + , do + write s =<< fromStringUTF8 "test" + end s + ] + it "readAll from PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ do + shouldEqual "test" =<< toStringUTF8 =<< readAll s + , do + write s =<< fromStringUTF8 "test" + end s + ] + it "readAll from empty PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual "" =<< toStringUTF8 =<< readAll s + , end s + ] + it "readSome from destroyed PassThrough" do + s <- newStreamPassThrough + liftEffect $ destroy s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + it "readSome from destroyed PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + , liftEffect $ destroy s + ] + it "readAll from destroyed PassThrough concurrent " do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual "" =<< toStringUTF8 =<< readAll s + , liftEffect $ destroy s + ] + it "readN from destroyed PassThrough concurrent " do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN s 1 + , liftEffect $ destroy s + ] + it "write to destroyed PassThrough" do + s <- newStreamPassThrough + liftEffect $ destroy s + expectError $ write s =<< fromStringUTF8 "test" + it "writes and reads to file" do let outfilename = "/tmp/test1.txt" let magnitude = 100000 outfile <- liftEffect $ createWriteStream outfilename - outstring <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8 + [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" write outfile $ Array.replicate magnitude outstring infile <- liftEffect $ createReadStream outfilename - Tuple input1 _ <- readSome infile - Tuple input2 _ <- readN infile (5 * magnitude) - Tuple input3 readagain <- readAll infile - shouldEqual readagain false - _ :: Buffer <- liftEffect <<< concat <<< fst =<< readSome infile + { buffers: input1 } <- readSome infile + { buffers: input2 } <- readN infile (5 * magnitude) + input3 <- readAll infile + _ :: Buffer <- liftEffect <<< concat <<< _.buffers =<< readSome infile void $ readN infile 1 void $ readAll infile let inputs = input1 <> input2 <> input3 input :: Buffer <- liftEffect $ concat inputs inputSize <- liftEffect $ Buffer.size input shouldEqual inputSize (10 * magnitude) - it "writes and closes" do + it "writes and closes file" do let outfilename = "/tmp/test2.txt" outfile <- liftEffect $ createWriteStream outfilename - b <- liftEffect $ Buffer.fromString "test" UTF8 - write outfile [ b ] + write outfile =<< fromStringUTF8 "test" end outfile - expectError $ write outfile [ b ] - it "reads from a zero-length Readable" do - r <- liftEffect $ newReadableStringUTF8 "" - b1 <- toStringUTF8 =<< (fst <$> readSome r) - shouldEqual "" b1 - b2 <- toStringUTF8 =<< (fst <$> readAll r) - shouldEqual "" b2 - b3 <- toStringUTF8 =<< (fst <$> readN r 0) - shouldEqual "" b3 + expectError $ write outfile =<< fromStringUTF8 "test2" pure unit + +toStringBuffers + :: { buffers :: Array Buffer, readagain :: Boolean } + -> Aff { buffers :: String, readagain :: Boolean } +toStringBuffers { buffers, readagain } = do + buffers' <- toStringUTF8 buffers + pure { buffers: buffers', readagain } diff --git a/test/Main3.purs b/test/Main3.purs index d96092c..b095eb0 100644 --- a/test/Main3.purs +++ b/test/Main3.purs @@ -9,7 +9,6 @@ import Prelude import Data.Array as Array import Data.Either (Either(..)) -import Data.Tuple (Tuple(..)) import Effect (Effect) import Effect.Aff (Error, runAff_) import Effect.Class (liftEffect) @@ -38,11 +37,11 @@ main = unsafePartial $ do describe "Node.Stream.Aff" do it "reads 1" do infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv - Tuple inputs1 _ <- readN infile 500000 + { buffers: inputs1 } <- readN infile 500000 bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a + _) <$> Buffer.size b) 0 inputs1 shouldEqual 500000 bytesRead1 - Tuple inputs2 _ <- readSome infile - Tuple inputs3 _ <- readAll infile + { buffers: inputs2 } <- readSome infile + inputs3 <- readAll infile let inputs = inputs1 <> inputs2 <> inputs3 -- TODO read after EOF will hang -- inputs4 <- readAll infile