diff --git a/build.sbt b/build.sbt index c5eec8d8..e43cdb21 100644 --- a/build.sbt +++ b/build.sbt @@ -7,8 +7,7 @@ scalaVersion := "2.12.7" libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "1.2.0", - "co.fs2" %% "fs2-core" % "1.0.3", - "co.fs2" %% "fs2-io" % "1.0.3", + "com.typesafe.akka" %% "akka-stream" % "2.5.20", "com.chuusai" %% "shapeless" % "2.3.3", "org.specs2" %% "specs2-core" % "4.4.1" % "test" diff --git a/src/main/scala/io/github/vigoo/prox/ops.scala b/src/main/scala/io/github/vigoo/prox/ops.scala index 2fd7d810..4766640d 100644 --- a/src/main/scala/io/github/vigoo/prox/ops.scala +++ b/src/main/scala/io/github/vigoo/prox/ops.scala @@ -1,8 +1,11 @@ package io.github.vigoo.prox +import akka.Done +import akka.stream.Materializer +import akka.stream.scaladsl._ +import akka.util.ByteString import cats.effect.{Concurrent, ContextShift, IO} import cats.kernel.Monoid -import fs2._ import shapeless._ import shapeless.ops.hlist.{IsHCons, Last, Prepend, Tupler} @@ -30,11 +33,13 @@ trait Start[PN <: ProcessNode[_, _, _, _, _]] { * * @param process The process to be started * @param dontStartOutput Do no start the output redirection stream - * @param blockingExecutionContext Execution context for the blocking stream IO * @param contextShift Context shifter to be used for the streams * @return Returns the [[RunningProcess]] instances of the started system processes */ - def apply(process: PN, dontStartOutput: Boolean = false, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): IO[RunningProcesses] + def apply(process: PN, dontStartOutput: Boolean = false) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RunningProcesses] /** Start the given process * @@ -45,11 +50,13 @@ trait Start[PN <: ProcessNode[_, _, _, _, _]] { * * @param process The process to be started * @param dontStartOutput Do no start the output redirection stream - * @param blockingExecutionContext Execution context for the blocking stream IO * @param contextShift Context shifter to be used for the streams * @return Returns the [[RunningProcess]] instances of the started system processes as a [[shapeless.HList]] */ - def toHList(process: PN, dontStartOutput: Boolean = false, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): IO[RunningProcessList] + def toHList(process: PN, dontStartOutput: Boolean = false) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RunningProcessList] } object Start { @@ -66,8 +73,10 @@ object Start { new Start[Process[Out, Err, OutResult, ErrResult, IRS, ORS, ERS]] { override type RunningProcesses = RunningProcess[Out, OutResult, ErrResult] override type RunningProcessList = RunningProcess[Out, OutResult, ErrResult] :: HNil - override def apply(process: Process[Out, Err, OutResult, ErrResult, IRS, ORS, ERS], dontStartOutput: Boolean, blockingExecutionContext: ExecutionContext) - (implicit contextShift: ContextShift[IO]): IO[RunningProcess[Out, OutResult, ErrResult]] = { + override def apply(process: Process[Out, Err, OutResult, ErrResult, IRS, ORS, ERS], dontStartOutput: Boolean) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RunningProcess[Out, OutResult, ErrResult]] = { def withWorkingDirectory(builder: ProcessBuilder): ProcessBuilder = process.workingDirectory match { case Some(directory) => builder.directory(directory.toFile) @@ -87,9 +96,9 @@ object Start { builder.redirectError(process.errorTarget.toRedirect) for { proc <- IO(builder.start) - inputStream = process.inputSource.connect(proc, blockingExecutionContext) - outputStream = process.outputTarget.connect(proc, blockingExecutionContext) - errorStream = process.errorTarget.connect(proc, blockingExecutionContext) + inputStream = process.inputSource.connect(proc) + outputStream = process.outputTarget.connect(proc) + errorStream = process.errorTarget.connect(proc) runningInput <- process.inputSource.run(inputStream) runningOutput <- if (dontStartOutput) { Concurrent[IO].start(IO(outResultMonoid.empty)) } else { process.outputTarget.run(outputStream) } runningError <- process.errorTarget.run(errorStream) @@ -101,8 +110,11 @@ object Start { runningError) } - override def toHList(process: Process[Out, Err, OutResult, ErrResult, IRS, ORS, ERS], dontStartOutput: Boolean, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): IO[RunningProcessList] = - apply(process, dontStartOutput, blockingExecutionContext).map(runningProcess => runningProcess :: HNil) + override def toHList(process: Process[Out, Err, OutResult, ErrResult, IRS, ORS, ERS], dontStartOutput: Boolean) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RunningProcessList] = + apply(process, dontStartOutput).map(runningProcess => runningProcess :: HNil) } implicit def startPipedProcess[ @@ -117,26 +129,30 @@ object Start { start1: Start.Aux[PN1, RP1, RPL1], start2: Start.Aux[PN2, RP2, RPL2], last1: Last.Aux[RPL1, RP1Last], - rp1LastType: RP1Last <:< RunningProcess[Byte, _, _], + rp1LastType: RP1Last <:< RunningProcess[ByteString, _, _], hcons2: IsHCons.Aux[RPL2, RP2Head, RP2Tail], prepend: Prepend.Aux[RPL1, RPL2, RPL], tupler: Tupler.Aux[RPL, RPT]): - Aux[PipedProcess[Out, Err, Byte, PN1, PN2, IRS, ORS, ERS], RPT, RPL] = - new Start[PipedProcess[Out, Err, Byte, PN1, PN2, IRS, ORS, ERS]] { + Aux[PipedProcess[Out, Err, ByteString, PN1, PN2, IRS, ORS, ERS], RPT, RPL] = + new Start[PipedProcess[Out, Err, ByteString, PN1, PN2, IRS, ORS, ERS]] { override type RunningProcesses = RPT override type RunningProcessList = RPL - override def apply(pipe: PipedProcess[Out, Err, Byte, PN1, PN2, IRS, ORS, ERS], dontStartOutput: Boolean, blockingExecutionContext: ExecutionContext) - (implicit contextShift: ContextShift[IO]): IO[RPT] = { - toHList(pipe, dontStartOutput, blockingExecutionContext).map(_.tupled) + override def apply(pipe: PipedProcess[Out, Err, ByteString, PN1, PN2, IRS, ORS, ERS], dontStartOutput: Boolean) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RPT] = { + toHList(pipe, dontStartOutput).map(_.tupled) } - override def toHList(pipe: PipedProcess[Out, Err, Byte, PN1, PN2, IRS, ORS, ERS], dontStartOutput: Boolean, blockingExecutionContext: ExecutionContext) - (implicit contextShift: ContextShift[IO]): IO[RPL] = { - start1.toHList(pipe.from, dontStartOutput = true, blockingExecutionContext).flatMap { runningSourceProcesses => - val runningFrom = runningSourceProcesses.last.asInstanceOf[RunningProcess[Byte, _, _]] + override def toHList(pipe: PipedProcess[Out, Err, ByteString, PN1, PN2, IRS, ORS, ERS], dontStartOutput: Boolean) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RPL] = { + start1.toHList(pipe.from, dontStartOutput = true).flatMap { runningSourceProcesses => + val runningFrom = runningSourceProcesses.last.asInstanceOf[RunningProcess[ByteString, _, _]] val to = pipe.createTo(PipeConstruction(runningFrom.notStartedOutput.get)) - start2.toHList(to, dontStartOutput, blockingExecutionContext).flatMap { runningTargetProcesses => + start2.toHList(to, dontStartOutput).flatMap { runningTargetProcesses => IO(runningSourceProcesses ::: runningTargetProcesses) } } @@ -350,10 +366,10 @@ trait Piping[PN1 <: ProcessNode[_, _, _, NotRedirected, _], PN2 <: ProcessNode[_ * * @param from The process to use as a source * @param to The process to feed the source process' output to - * @param via The [[fs2.Pipe]] between the two processes + * @param via The [[Flow]] between the two processes * @return Returns the piped process */ - def apply(from: PN1, to: PN2, via: Pipe[IO, Byte, Byte]): ResultProcess + def apply(from: PN1, to: PN2, via: Flow[ByteString, ByteString, Any]): ResultProcess } @@ -369,21 +385,21 @@ object Piping { PN1Redirected <: ProcessNode[_, _, _, Redirected, _], PN2Redirected <: ProcessNode[_, _, Redirected, _, _]] (implicit - pn1SubTyping: PN1 <:< ProcessNode[Byte, _, PN1IRS, NotRedirected, PN1ERS], + pn1SubTyping: PN1 <:< ProcessNode[ByteString, _, PN1IRS, NotRedirected, PN1ERS], pn2SubTyping: PN2 <:< ProcessNode[PN2Out, PN2Err, NotRedirected, PN2ORS, PN2ERS], - redirectPN1Output: RedirectOutput.Aux[PN1, Drain[Byte], Byte, Unit, PN1Redirected], + redirectPN1Output: RedirectOutput.Aux[PN1, Drain[ByteString], ByteString, Unit, PN1Redirected], redirectPN2Input: RedirectInput.Aux[PN2, PN2Redirected]): Aux[PN1, PN2, - PipedProcess[PN2Out, PN2Err, Byte, PN1Redirected, PN2Redirected, PN1IRS, PN2ORS, PN2ERS]] = + PipedProcess[PN2Out, PN2Err, ByteString, PN1Redirected, PN2Redirected, PN1IRS, PN2ORS, PN2ERS]] = new Piping[PN1, PN2] { override type ResultProcess = - PipedProcess[PN2Out, PN2Err, Byte, + PipedProcess[PN2Out, PN2Err, ByteString, PN1Redirected, PN2Redirected, PN1IRS, PN2ORS, PN2ERS] - override def apply(from: PN1, to: PN2, via: Pipe[IO, Byte, Byte]): ResultProcess = { + override def apply(from: PN1, to: PN2, via: Flow[ByteString, ByteString, Any]): ResultProcess = { val channel = Drain(via) new PipedProcess( redirectPN1Output(from, channel), @@ -400,7 +416,7 @@ object Piping { * @param via The custom pipe * @tparam PN Type of the first process */ -class PipeBuilder[PN <: ProcessNode[_, _, _, NotRedirected, _]](processNode: PN, via: Pipe[IO, Byte, Byte]) { +class PipeBuilder[PN <: ProcessNode[_, _, _, NotRedirected, _]](processNode: PN, via: Flow[ByteString, ByteString, Any]) { /** Constructs the piping by providing the target process * * @param to The target process @@ -455,6 +471,11 @@ class PipeBuilder[PN <: ProcessNode[_, _, _, NotRedirected, _]](processNode: PN, * }}} */ object syntax { + implicit val monoidDone: Monoid[Done] = new Monoid[Done] { + override def empty: Done = Done + override def combine(x: Done, y: Done): Done = Done + } + implicit class ProcessNodeOutputRedirect[PN <: ProcessNode[_, _, _, NotRedirected, _]](processNode: PN) { /** Redirects the output channel of a process * @@ -485,14 +506,14 @@ object syntax { def |[PN2 <: ProcessNode[_, _, NotRedirected, _, _], RP <: ProcessNode[_, _, _, _, _]] (to: PN2) (implicit piping: Piping.Aux[PN, PN2, RP]): RP = - piping(processNode, to, identity[Stream[IO, Byte]]) + piping(processNode, to, Flow.apply) /** Creates a piped process by providing a custom pipe * * @param via The custom pipe between the two process * @return Returns a [[PipeBuilder]] instance which can be used to complete the piping specification */ - def via(via: Pipe[IO, Byte, Byte]): PipeBuilder[PN] = + def via(via: Flow[ByteString, ByteString, Any]): PipeBuilder[PN] = new PipeBuilder(processNode, via) } @@ -537,23 +558,29 @@ object syntax { /** Starts the process * * @param start Type class implementing the process starting - * @param blockingExecutionContext Execution context for the blocking stream IO * @param contextShift Context shifter to be used for the streams * @tparam RP Type encoding the [[RunningProcess]] instances * @return Returns the [[RunningProcess]] instances for the system processes which has been started */ - def start[RP](blockingExecutionContext: ExecutionContext)(implicit start: Start.Aux[PN, RP, _], contextShift: ContextShift[IO]): IO[RP] = - start(processNode, dontStartOutput = false, blockingExecutionContext) + def start[RP]() + (implicit start: Start.Aux[PN, RP, _], + contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RP] = + start(processNode, dontStartOutput = false) /** Starts the process * * @param start Type class implementing the process starting - * @param blockingExecutionContext Execution context for the blocking stream IO * @param contextShift Context shifter to be used for the streams * @tparam RPL Type encoding the [[RunningProcess]] instances in a [[shapeless.HList]] * @return Returns the HList of [[RunningProcess]] instances for the system processes which has been started */ - def startHL[RPL <: HList](blockingExecutionContext: ExecutionContext)(implicit start: Start.Aux[PN, _, RPL], contextShift: ContextShift[IO]): IO[RPL] = - start.toHList(processNode, dontStartOutput = false, blockingExecutionContext) + def startHL[RPL <: HList]() + (implicit start: Start.Aux[PN, _, RPL], + contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[RPL] = + start.toHList(processNode, dontStartOutput = false) } } diff --git a/src/main/scala/io/github/vigoo/prox/process.scala b/src/main/scala/io/github/vigoo/prox/process.scala index 22106f55..11ecd87a 100644 --- a/src/main/scala/io/github/vigoo/prox/process.scala +++ b/src/main/scala/io/github/vigoo/prox/process.scala @@ -1,10 +1,12 @@ package io.github.vigoo.prox +import akka.stream.scaladsl._ import java.lang.ProcessBuilder.Redirect import java.nio.file.Path +import akka.stream.Materializer +import akka.util.ByteString import cats.effect.{ContextShift, Fiber, IO} -import fs2._ import scala.concurrent.ExecutionContext import scala.language.{higherKinds, implicitConversions} @@ -38,7 +40,8 @@ trait ProcessIO[O, R] { * @param contextShift Context shifter * @return Returns the not yet started redirection stream */ - def connect(systemProcess: java.lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, O] + def connect(systemProcess: java.lang.Process) + (implicit contextShift: ContextShift[IO]): Source[O, Any] /** Runs the redirection stream * @@ -46,7 +49,10 @@ trait ProcessIO[O, R] { * @param contextShift Context shifter * @return Returns the async result of running the stream */ - def run(stream: Stream[IO, O])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, R]] + def run(stream: Source[O, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, R]] } @@ -55,7 +61,7 @@ trait ProcessIO[O, R] { * @param outStream Output stream of the first process, to be used as the input stream of the second one * @tparam Out Element type of the piping stream */ -case class PipeConstruction[Out](outStream: Stream[IO, Out]) +case class PipeConstruction[Out](outStream: Source[Out, Any]) /** Phantom type representing the redirection state of a process */ @@ -184,8 +190,8 @@ object Process { */ def apply(command: String, arguments: List[String] = List.empty, - workingDirectory: Option[Path] = None): Process[Byte, Byte, Unit, Unit, NotRedirected, NotRedirected, NotRedirected] = - new Process[Byte, Byte, Unit, Unit, NotRedirected, NotRedirected, NotRedirected](command, arguments, workingDirectory, StdIn, StdOut, StdError, Map.empty) + workingDirectory: Option[Path] = None): Process[ByteString, ByteString, Unit, Unit, NotRedirected, NotRedirected, NotRedirected] = + new Process[ByteString, ByteString, Unit, Unit, NotRedirected, NotRedirected, NotRedirected](command, arguments, workingDirectory, StdIn, StdOut, StdError, Map.empty) } @@ -233,7 +239,7 @@ trait RunningProcess[Out, OutResult, ErrResult] { * * @return Returns the output stream for the piping */ - private[prox] def notStartedOutput: Option[Stream[IO, Out]] + private[prox] def notStartedOutput: Option[Source[Out, Any]] } @@ -251,7 +257,7 @@ trait RunningProcess[Out, OutResult, ErrResult] { * @tparam ErrResult Result type of running the redirected error stream. [[Unit]] if there is no such result. */ private[prox] class WrappedProcess[Out, OutResult, ErrResult](systemProcess: java.lang.Process, - val notStartedOutput: Option[Stream[IO, Out]], + val notStartedOutput: Option[Source[Out, Any]], runningInput: Fiber[IO, Unit], runningOutput: Fiber[IO, OutResult], runningError: Fiber[IO, ErrResult]) diff --git a/src/main/scala/io/github/vigoo/prox/sources.scala b/src/main/scala/io/github/vigoo/prox/sources.scala index 34ceb80f..129851f4 100644 --- a/src/main/scala/io/github/vigoo/prox/sources.scala +++ b/src/main/scala/io/github/vigoo/prox/sources.scala @@ -4,14 +4,19 @@ import java.lang import java.lang.ProcessBuilder.Redirect import java.nio.file.Path +import akka.Done +import akka.stream.Materializer +import akka.stream.scaladsl.StreamConverters._ +import akka.stream.scaladsl._ +import akka.util.ByteString import cats.effect.{Concurrent, ContextShift, Fiber, IO} -import fs2._ import scala.concurrent.ExecutionContext import scala.language.higherKinds +import scala.util.{Failure, Success} /** Base trait for input redirection handlers */ -trait ProcessInputSource extends ProcessIO[Byte, Unit] +trait ProcessInputSource extends ProcessIO[ByteString, Unit] /** Type class for creating input redirection handlers * @@ -29,27 +34,27 @@ trait CanBeProcessInputSource[From] { * There are instances for the following types: * * - [[java.nio.file.Path]] to use a file as input - * - [[fs2.Stream]] to use a byte stream as input + * - [[Source]] to use a byte stream as input */ object CanBeProcessInputSource { implicit val pathAsSource: CanBeProcessInputSource[Path] = (path: Path) => new FileSource(path) - implicit def streamAsSource: CanBeProcessInputSource[Stream[IO, Byte]] = - (source: Stream[IO, Byte]) => new InputStreamingSource(source) - - implicit def pureStreamAsSource: CanBeProcessInputSource[Stream[Pure, Byte]] = - (source: Stream[Pure, Byte]) => new InputStreamingSource(source) + implicit def streamAsSource: CanBeProcessInputSource[Source[ByteString, Any]] = + (source: Source[ByteString, Any]) => new InputStreamingSource(source) } /** Default input source representing no redirection */ object StdIn extends ProcessInputSource { override def toRedirect: Redirect = Redirect.INHERIT - override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = - Stream.empty + override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] = + Source.empty - override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = Concurrent[IO].start(IO.unit) } @@ -59,10 +64,13 @@ object StdIn extends ProcessInputSource { */ class FileSource(path: Path) extends ProcessInputSource { override def toRedirect: Redirect = Redirect.from(path.toFile) - override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = - Stream.empty + override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] = + Source.empty - override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = Concurrent[IO].start(IO.unit) } @@ -70,17 +78,21 @@ class FileSource(path: Path) extends ProcessInputSource { * * @param source The input byte stream */ -class InputStreamingSource(source: Stream[IO, Byte]) extends ProcessInputSource { +class InputStreamingSource(source: Source[ByteString, Any]) extends ProcessInputSource { override def toRedirect: Redirect = Redirect.PIPE - override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = { - source.observe( - io.writeOutputStream[IO]( - IO { systemProcess.getOutputStream }, - closeAfterUse = true, - blockingExecutionContext = blockingExecutionContext)) + override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] = + source.alsoTo(fromOutputStream(() => systemProcess.getOutputStream, autoFlush = true)) + + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = { + Concurrent[IO].start(IO.async { finish => + stream.runWith(Sink.ignore).onComplete { + case Success(Done) => finish(Right(())) + case Failure(reason) => finish(Left(reason)) + } + }) } - - override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = - Concurrent[IO].start(stream.compile.drain) } \ No newline at end of file diff --git a/src/main/scala/io/github/vigoo/prox/targets.scala b/src/main/scala/io/github/vigoo/prox/targets.scala index 893744f3..3ef5b6d6 100644 --- a/src/main/scala/io/github/vigoo/prox/targets.scala +++ b/src/main/scala/io/github/vigoo/prox/targets.scala @@ -5,11 +5,14 @@ import java.lang import java.lang.ProcessBuilder.Redirect import java.nio.file.Path +import akka.stream.Materializer +import akka.stream.scaladsl._ +import akka.util.ByteString import cats.Monoid import cats.effect.{Concurrent, ContextShift, Fiber, IO} -import fs2._ -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} /** Base trait for output redirection handlers * @@ -41,43 +44,51 @@ trait CanBeProcessOutputTarget[To] { /** Wraps a pipe to modify how the stream is executed * * If a pipe used as an output or error target is wrapped by [[Drain]], the stream will be executed - * by [[fs2.Stream.CompileOps.drain]] and the result type will be [[Unit]]. + * by [[Sink.ignore]] and the result type will be [[Unit]]. * * @param pipe The pipe to wrap - * @tparam O Stream element type + * @tparam O Stream element type */ -case class Drain[O](pipe: Pipe[IO, Byte, O]) +case class Drain[O](pipe: Flow[ByteString, O, Any]) /** Wraps a pipe to modify how the stream is executed * * If a pipe used as an output or error target is wrapped by [[ToVector]], the stream will be executed - * by [[fs2.Stream.CompileOps.toVector]] and the result type will be a [[Vector]] of its element + * by [[Sink.seq]] and the result type will be a [[Vector]] of its element * type. * * @param pipe The pipe to wrap - * @tparam O Stream element type + * @tparam O Stream element type */ -case class ToVector[O](pipe: Pipe[IO, Byte, O]) +case class ToVector[O](pipe: Flow[ByteString, O, Any]) /** Wraps the pipe to modify how the stream is executed * * If a pipe used as an output or error target is wrapped by [[Fold]], the stream will be executed - * by [[fs2.Stream.CompileOps.fold]] and the result type will be the result type of the provided + * by [[Sink.fold]] and the result type will be the result type of the provided * fold function. * * @param pipe The pipe to wrap * @param init Initial value for the fold * @param f The fold function - * @tparam O Stream element type - * @tparam R Fold result type + * @tparam O Stream element type + * @tparam R Fold result type */ -case class Fold[O, R](pipe: Pipe[IO, Byte, O], init: R, f: (R, O) => R) +case class Fold[O, R](pipe: Flow[ByteString, O, Any], init: R, f: (R, O) => R) trait LowPriorityCanBeProcessOutputTarget { - implicit def pipeAsTarget[Out]: CanBeProcessOutputTarget.Aux[Pipe[IO, Byte, Out], Out, Vector[Out]] = - CanBeProcessOutputTarget.create((pipe: Pipe[IO, Byte, Out]) => new OutputStreamingTarget(pipe) with ProcessOutputTarget[Out, Vector[Out]] { - override def run(stream: Stream[IO, Out])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Vector[Out]]] = - Concurrent[IO].start(stream.compile.toVector) + implicit def pipeAsTarget[Out, Mat]: CanBeProcessOutputTarget.Aux[Flow[ByteString, Out, Mat], Out, Vector[Out]] = + CanBeProcessOutputTarget.create((pipe: Flow[ByteString, Out, Mat]) => new OutputStreamingTarget(pipe) with ProcessOutputTarget[Out, Vector[Out]] { + override def run(stream: Source[Out, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Vector[Out]]] = + Concurrent[IO].start(IO.async { complete => + stream.runWith(Sink.seq).onComplete { + case Success(value) => complete(Right(value.toVector)) + case Failure(reason) => complete(Left(reason)) + } + }) }) } @@ -86,9 +97,9 @@ trait LowPriorityCanBeProcessOutputTarget { * There are instances for the following types: * * - [[java.nio.file.Path]] to redirect the output to a file - * - [[fs2.Sink]] to redirect the output to a sink. The result type is [[Unit]]. - * - [[fs2.Pipe]] if the pipe's output element type is a [[cats.Monoid]]. The result type is its element type. - * - [[fs2.Pipe]] if the pipe's output element type is not a [[cats.Monoid]]. The result type is a [[Vector]] of its element type. + * - [[Sink]] to redirect the output to a sink. The result type is [[Unit]]. + * - [[Flow]] if the pipe's output element type is a [[cats.Monoid]]. The result type is its element type. + * - [[Flow]] if the pipe's output element type is not a [[cats.Monoid]]. The result type is a [[Vector]] of its element type. * - [[Drain]] * - [[ToVector]] * - [[Fold]] @@ -103,44 +114,85 @@ object CanBeProcessOutputTarget extends LowPriorityCanBeProcessOutputTarget { new CanBeProcessOutputTarget[To] { override type Out = Out0 override type OutResult = OutResult0 + override def apply(to: To): ProcessOutputTarget[Out, OutResult] = fn(to) } - implicit val pathAsTarget: Aux[Path, Byte, Unit] = + implicit val pathAsTarget: Aux[Path, ByteString, Unit] = create((path: Path) => new FileTarget(path)) - implicit def sinkAsTarget: Aux[Pipe[IO, Byte, Unit], Unit, Unit] = - create((pipe: Pipe[IO, Byte, Unit]) => new OutputStreamingTarget(pipe) with ProcessOutputTarget[Unit, Unit] { - - override def run(stream: Stream[IO, Unit])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = - Concurrent[IO].start(stream.compile.drain) + implicit def sinkAsTarget[R]: Aux[Sink[ByteString, Future[R]], ByteString, R] = + create((sink: Sink[ByteString, Future[R]]) => new OutputStreamingTarget(Flow.fromFunction(identity)) with ProcessOutputTarget[ByteString, R] { + + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, R]] = + Concurrent[IO].start(IO.async { complete => + stream.runWith(sink).onComplete { + case Success(value) => complete(Right(value)) + case Failure(reason) => complete(Left(reason)) + } + }) }) - implicit def monoidPipeAsTarget[Out](implicit monoid: Monoid[Out]): Aux[Pipe[IO, Byte, Out], Out, Out] = - create((pipe: Pipe[IO, Byte, Out]) => new OutputStreamingTarget(pipe) with ProcessOutputTarget[Out, Out] { - override def run(stream: Stream[IO, Out])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Out]] = { - Concurrent[IO].start(stream.compile.foldMonoid) + implicit def monoidPipeAsTarget[Out, Mat](implicit monoid: Monoid[Out]): Aux[Flow[ByteString, Out, Mat], Out, Out] = + create((pipe: Flow[ByteString, Out, Mat]) => new OutputStreamingTarget(pipe) with ProcessOutputTarget[Out, Out] { + override def run(stream: Source[Out, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Out]] = { + Concurrent[IO].start(IO.async { complete => + stream.runFold(monoid.empty)(monoid.combine).onComplete { + case Success(value) => complete(Right(value)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) implicit def ignorePipeAsOutputTarget[Out]: Aux[Drain[Out], Out, Unit] = create((ignore: Drain[Out]) => new OutputStreamingTarget(ignore.pipe) with ProcessOutputTarget[Out, Unit] { - override def run(stream: Stream[IO, Out])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = { - Concurrent[IO].start(stream.compile.drain) + override def run(stream: Source[Out, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = { + Concurrent[IO].start(IO.async { complete => + stream.runWith(Sink.ignore).onComplete { + case Success(value) => complete(Right(())) + case Failure(reason) => complete(Left(reason)) + } + }) } }) implicit def logPipeAsOutputTarget[Out]: Aux[ToVector[Out], Out, Vector[Out]] = create((log: ToVector[Out]) => new OutputStreamingTarget(log.pipe) with ProcessOutputTarget[Out, Vector[Out]] { - override def run(stream: Stream[IO, Out])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Vector[Out]]] = { - Concurrent[IO].start(stream.compile.toVector) + override def run(stream: Source[Out, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Vector[Out]]] = { + Concurrent[IO].start(IO.async { complete => + stream.runWith(Sink.seq).onComplete { + case Success(value) => complete(Right(value.toVector)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) implicit def foldPipeAsOutputTarget[Out, Res]: Aux[Fold[Out, Res], Out, Res] = create((fold: Fold[Out, Res]) => new OutputStreamingTarget(fold.pipe) with ProcessOutputTarget[Out, Res] { - override def run(stream: Stream[IO, Out])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Res]] = { - Concurrent[IO].start(stream.compile.fold(fold.init)(fold.f)) + override def run(stream: Source[Out, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Res]] = { + Concurrent[IO].start(IO.async { complete => + stream.runFold(fold.init)(fold.f).onComplete { + case Success(value) => complete(Right(value)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) } @@ -152,14 +204,23 @@ object CanBeProcessOutputTarget extends LowPriorityCanBeProcessOutputTarget { trait CanBeProcessErrorTarget[To] { type Err type ErrResult + def apply(to: To): ProcessErrorTarget[Err, ErrResult] } trait LowPriorityCanBeProcessErrorTarget { - implicit def pipeAsErrorTarget[Err]: CanBeProcessErrorTarget.Aux[Pipe[IO, Byte, Err], Err, Vector[Err]] = - CanBeProcessErrorTarget.create((pipe: Pipe[IO, Byte, Err]) => new ErrorStreamingTarget(pipe) with ProcessErrorTarget[Err, Vector[Err]] { - override def run(stream: Stream[IO, Err])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Vector[Err]]] = { - Concurrent[IO].start(stream.compile.toVector) + implicit def pipeAsErrorTarget[Err, Mat]: CanBeProcessErrorTarget.Aux[Flow[ByteString, Err, Mat], Err, Vector[Err]] = + CanBeProcessErrorTarget.create((pipe: Flow[ByteString, Err, Mat]) => new ErrorStreamingTarget(pipe) with ProcessErrorTarget[Err, Vector[Err]] { + override def run(stream: Source[Err, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Vector[Err]]] = { + Concurrent[IO].start(IO.async { complete => + stream.runWith(Sink.seq).onComplete { + case Success(value) => complete(Right(value.toVector)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) } @@ -169,9 +230,9 @@ trait LowPriorityCanBeProcessErrorTarget { * There are instances for the following types: * * - [[java.nio.file.Path]] to redirect the error channel to a file - * - [[fs2.Sink]] to redirect the error channel to a sink. The result type is [[Unit]]. - * - [[fs2.Pipe]] if the pipe's output element type is a [[cats.Monoid]]. The result type is its element type. - * - [[fs2.Pipe]] if the pipe's output element type is not a [[cats.Monoid]]. The result type is a [[Vector]] of its element type. + * - [[Sink]] to redirect the error channel to a sink. The result type is [[Unit]]. + * - [[Flow]] if the pipe's output element type is a [[cats.Monoid]]. The result type is its element type. + * - [[Flow]] if the pipe's output element type is not a [[cats.Monoid]]. The result type is a [[Vector]] of its element type. * - [[Drain]] * - [[ToVector]] * - [[Fold]] @@ -186,60 +247,114 @@ object CanBeProcessErrorTarget extends LowPriorityCanBeProcessErrorTarget { new CanBeProcessErrorTarget[To] { override type Err = Err0 override type ErrResult = ErrResult0 + override def apply(to: To): ProcessErrorTarget[Err, ErrResult] = fn(to) } - implicit val pathAsErrorTarget: Aux[Path, Byte, Unit] = + implicit val pathAsErrorTarget: Aux[Path, ByteString, Unit] = create((path: Path) => new FileTarget(path)) - implicit def monoidPipeAsErrorTarget[Err](implicit monoid: Monoid[Err]): Aux[Pipe[IO, Byte, Err], Err, Err] = - create((pipe: Pipe[IO, Byte, Err]) => new ErrorStreamingTarget(pipe) with ProcessErrorTarget[Err, Err] { - override def run(stream: Stream[IO, Err])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Err]] = { - Concurrent[IO].start(stream.compile.foldMonoid) + implicit def sinkAsErrorTarget[R]: Aux[Sink[ByteString, Future[R]], ByteString, R] = + create((sink: Sink[ByteString, Future[R]]) => new ErrorStreamingTarget(Flow.fromFunction(identity)) with ProcessErrorTarget[ByteString, R] { + + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, R]] = + Concurrent[IO].start(IO.async { complete => + stream.runWith(sink).onComplete { + case Success(value) => complete(Right(value)) + case Failure(reason) => complete(Left(reason)) + } + }) + }) + + implicit def monoidPipeAsErrorTarget[Err, Mat](implicit monoid: Monoid[Err]): Aux[Flow[ByteString, Err, Mat], Err, Err] = + create((pipe: Flow[ByteString, Err, Mat]) => new ErrorStreamingTarget(pipe) with ProcessErrorTarget[Err, Err] { + override def run(stream: Source[Err, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Err]] = { + Concurrent[IO].start(IO.async { complete => + stream.runFold(monoid.empty)(monoid.combine).onComplete { + case Success(value) => complete(Right(value)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) implicit def logPipeAsErrorTarget[Err]: Aux[ToVector[Err], Err, Vector[Err]] = create((log: ToVector[Err]) => new ErrorStreamingTarget(log.pipe) with ProcessErrorTarget[Err, Vector[Err]] { - override def run(stream: Stream[IO, Err])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Vector[Err]]] = { - Concurrent[IO].start(stream.compile.toVector) + override def run(stream: Source[Err, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Vector[Err]]] = { + Concurrent[IO].start(IO.async { complete => + stream.runWith(Sink.seq).onComplete { + case Success(value) => complete(Right(value.toVector)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) implicit def ignorePipeAsErrorTarget[Err]: Aux[Drain[Err], Err, Unit] = create((ignore: Drain[Err]) => new ErrorStreamingTarget(ignore.pipe) with ProcessErrorTarget[Err, Unit] { - override def run(stream: Stream[IO, Err])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = { - Concurrent[IO].start(stream.compile.drain) + override def run(stream: Source[Err, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = { + Concurrent[IO].start(IO.async { complete => + stream.runWith(Sink.ignore).onComplete { + case Success(value) => complete(Right(())) + case Failure(reason) => complete(Left(reason)) + } + }) } }) implicit def foldPipeAsErrorTarget[Err, Res]: Aux[Fold[Err, Res], Err, Res] = create((fold: Fold[Err, Res]) => new ErrorStreamingTarget(fold.pipe) with ProcessErrorTarget[Err, Res] { - override def run(stream: Stream[IO, Err])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Res]] = { - Concurrent[IO].start(stream.compile.fold(fold.init)(fold.f)) + override def run(stream: Source[Err, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Res]] = { + Concurrent[IO].start(IO.async { complete => + stream.runFold(fold.init)(fold.f).onComplete { + case Success(value) => complete(Right(value)) + case Failure(reason) => complete(Left(reason)) + } + }) } }) } /** Default implementation of [[ProcessOutputTarget]] representing no redirection */ -object StdOut extends ProcessOutputTarget[Byte, Unit] { +object StdOut extends ProcessOutputTarget[ByteString, Unit] { override def toRedirect: Redirect = Redirect.INHERIT - override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = - Stream.empty + override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] = + Source.empty - override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = Concurrent[IO].start(IO.unit) } /** Default implementation of [[ProcessErrorTarget]] representing no redirection */ -object StdError extends ProcessErrorTarget[Byte, Unit] { +object StdError extends ProcessErrorTarget[ByteString, Unit] { override def toRedirect: Redirect = Redirect.INHERIT - override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = - Stream.empty + override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] = + Source.empty - override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = Concurrent[IO].start(IO.unit) } @@ -247,13 +362,16 @@ object StdError extends ProcessErrorTarget[Byte, Unit] { * * @param path Path to the file to be written */ -class FileTarget(path: Path) extends ProcessOutputTarget[Byte, Unit] with ProcessErrorTarget[Byte, Unit] { +class FileTarget(path: Path) extends ProcessOutputTarget[ByteString, Unit] with ProcessErrorTarget[ByteString, Unit] { override def toRedirect: Redirect = Redirect.to(path.toFile) - override def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Byte] = - Stream.empty + override def connect(systemProcess: lang.Process)(implicit contextShift: ContextShift[IO]): Source[ByteString, Any] = + Source.empty - override def run(stream: Stream[IO, Byte])(implicit contextShift: ContextShift[IO]): IO[Fiber[IO, Unit]] = + override def run(stream: Source[ByteString, Any]) + (implicit contextShift: ContextShift[IO], + materializer: Materializer, + executionContext: ExecutionContext): IO[Fiber[IO, Unit]] = Concurrent[IO].start(IO.unit) } @@ -261,43 +379,40 @@ class FileTarget(path: Path) extends ProcessOutputTarget[Byte, Unit] with Proces * * @param target Target stream * @param chunkSize Chunk size - * @tparam Out Stream output element type + * @tparam Out Stream output element type */ -abstract class OutputStreamingTargetBase[Out](target: Pipe[IO, Byte, Out], chunkSize: Int = 4096) { +abstract class OutputStreamingTargetBase[Out](target: Flow[ByteString, Out, Any], chunkSize: Int = 4096) { def toRedirect: Redirect = Redirect.PIPE - def connect(systemProcess: lang.Process, blockingExecutionContext: ExecutionContext)(implicit contextShift: ContextShift[IO]): Stream[IO, Out] = { - io.readInputStream[IO]( - getStream(systemProcess), - chunkSize, - closeAfterUse = true, - blockingExecutionContext = blockingExecutionContext) - .through(target) + + def connect(systemProcess: lang.Process) + (implicit contextShift: ContextShift[IO]): Source[Out, Any] = { + StreamConverters.fromInputStream(() => getStream(systemProcess), chunkSize).via(target) } - def getStream(systemProcess: java.lang.Process): IO[InputStream] + def getStream(systemProcess: java.lang.Process): InputStream } /** Output target implementation using a stream pipe as the target * - * @param target Target stream - * @tparam Out Stream output element type + * @param target Target stream + * @tparam Out Stream output element type */ -abstract class OutputStreamingTarget[Out](target: Pipe[IO, Byte, Out]) +abstract class OutputStreamingTarget[Out, Mat](target: Flow[ByteString, Out, Mat]) extends OutputStreamingTargetBase(target) { - override def getStream(systemProcess: java.lang.Process): IO[InputStream] = - IO(systemProcess.getInputStream) + override def getStream(systemProcess: java.lang.Process): InputStream = + systemProcess.getInputStream } /** Error target implementation using a stream pipe as the target * - * @param target Target stream - * @tparam Err Stream output element type + * @param target Target stream + * @tparam Err Stream output element type */ -abstract class ErrorStreamingTarget[Err](target: Pipe[IO, Byte, Err]) +abstract class ErrorStreamingTarget[Err, Mat](target: Flow[ByteString, Err, Mat]) extends OutputStreamingTargetBase(target) { - override def getStream(systemProcess: java.lang.Process): IO[InputStream] = - IO(systemProcess.getErrorStream) + override def getStream(systemProcess: java.lang.Process): InputStream = + systemProcess.getErrorStream } diff --git a/src/test/scala/io/github/vigoo/prox/ProcessSpecs.scala b/src/test/scala/io/github/vigoo/prox/ProcessSpecs.scala index e4e16a96..0ad3be05 100644 --- a/src/test/scala/io/github/vigoo/prox/ProcessSpecs.scala +++ b/src/test/scala/io/github/vigoo/prox/ProcessSpecs.scala @@ -1,19 +1,24 @@ package io.github.vigoo.prox import java.io.File +import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import java.util.concurrent.Executors +import akka.{Done, NotUsed} +import akka.actor.ActorSystem +import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.scaladsl._ +import akka.util.ByteString import cats.effect.{ContextShift, IO} import cats.implicits._ -import fs2._ import org.specs2.Specification import scala.concurrent.ExecutionContext.Implicits.global import shapeless.test.illTyped import syntax._ -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} // scalastyle:off public.methods.have.type // scalastyle:off public.member.have.type @@ -64,12 +69,16 @@ class ProcessSpecs extends Specification { def is = s2""" """ implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - val blockingExecutionContext = ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) + implicit val actorSystem: ActorSystem = ActorSystem("test") + implicit val materializer: Materializer = ActorMaterializer() + + val utf8Decode: Sink[ByteString, Future[String]] = + Flow[ByteString].reduce(_ ++ _).map(_.utf8String).toMat(Sink.head)(Keep.right) def simpleProcessGetExitCode = { val program = for { - trueRunning <- Process("true").start(blockingExecutionContext) - falseRunning <- Process("false").start(blockingExecutionContext) + trueRunning <- Process("true").start() + falseRunning <- Process("false").start() trueResult <- trueRunning.waitForExit() falseResult <- falseRunning.waitForExit() } yield (trueResult.exitCode, falseResult.exitCode) @@ -79,7 +88,7 @@ class ProcessSpecs extends Specification { def is = s2""" def workingDirectoryWorks = { val tempDirectory = Files.createTempDirectory("prox") val program = for { - pwdRunning <- ((Process("pwd") in tempDirectory) > text.utf8Decode[IO]).start(blockingExecutionContext) + pwdRunning <- ((Process("pwd") in tempDirectory) > utf8Decode).start() pwdResult <- pwdRunning.waitForExit() } yield pwdResult.fullOutput.trim @@ -89,9 +98,9 @@ class ProcessSpecs extends Specification { def is = s2""" def simpleProcessFileOutput = { val tempFile = File.createTempFile("test", "txt") val program = for { - running <- (Process("echo", List("Hello world!")) > tempFile.toPath).start(blockingExecutionContext) + running <- (Process("echo", List("Hello world!")) > tempFile.toPath).start() _ <- running.waitForExit() - contents <- io.file.readAll[IO](tempFile.toPath, ExecutionContext.global, 1024).through(text.utf8Decode).compile.foldMonoid + contents <- IO(new String(Files.readAllBytes(tempFile.toPath), StandardCharsets.UTF_8)) } yield contents program.unsafeRunSync() must beEqualTo("Hello world!\n") @@ -99,7 +108,7 @@ class ProcessSpecs extends Specification { def is = s2""" def simpleProcessStreamOutput = { val program = for { - running <- (Process("echo", List("Hello world!")) > text.utf8Decode[IO]).start(blockingExecutionContext) + running <- (Process("echo", List("Hello world!")) > utf8Decode).start() result <- running.waitForExit() } yield result.fullOutput @@ -109,9 +118,9 @@ class ProcessSpecs extends Specification { def is = s2""" def simpleProcessSinkOutput = { val builder = StringBuilder.newBuilder - val target: Sink[IO, Byte] = Sink(byte => IO { builder.append(byte.toChar) }) + val target: Sink[ByteString, Future[Done]] = Sink.foreach((bs: ByteString) => builder.append(bs.utf8String)) val program = for { - running <- (Process("echo", List("Hello world!")) > target).start(blockingExecutionContext) + running <- (Process("echo", List("Hello world!")) > target).start() _ <- running.waitForExit() } yield builder.toString() @@ -120,7 +129,7 @@ class ProcessSpecs extends Specification { def is = s2""" def simpleProcessStreamError = { val program = for { - running <- (Process("perl", List("-e", """print STDERR "Hello"""")) redirectErrorTo text.utf8Decode[IO]).start(blockingExecutionContext) + running <- (Process("perl", List("-e", """print STDERR "Hello"""")) redirectErrorTo utf8Decode).start() result <- running.waitForExit() } yield result.fullError @@ -130,9 +139,9 @@ class ProcessSpecs extends Specification { def is = s2""" def simpleProcessSinkError = { val builder = StringBuilder.newBuilder - val target: Sink[IO, Byte] = Sink(byte => IO { builder.append(byte.toChar) }) + val target: Sink[ByteString, Future[Done]] = Sink.foreach((bs: ByteString) => builder.append(bs.utf8String)) val program = for { - running <- (Process("perl", List("-e", """print STDERR "Hello"""")) redirectErrorTo target).start(blockingExecutionContext) + running <- (Process("perl", List("-e", """print STDERR "Hello"""")) redirectErrorTo target).start() _ <- running.waitForExit() } yield builder.toString @@ -140,9 +149,9 @@ class ProcessSpecs extends Specification { def is = s2""" } def simpleProcessStreamInput = { - val source = Stream("This is a test string").through(text.utf8Encode) + val source: Source[ByteString, Any] = Source.single("This is a test string").map(ByteString.apply) val program = for { - running <- (Process("wc", List("-w")) < source > text.utf8Decode[IO]).start(blockingExecutionContext) + running <- (Process("wc", List("-w")) < source > utf8Decode).start() result <- running.waitForExit() } yield result.fullOutput.trim @@ -154,7 +163,7 @@ class ProcessSpecs extends Specification { def is = s2""" Files.write(tempFile, "This is a test string".getBytes("UTF-8")) val pipedProcess = Process("cat") | Process("wc", List("-w")) val program = for { - runningProcesses <- (pipedProcess < tempFile > text.utf8Decode[IO]).start(blockingExecutionContext) + runningProcesses <- (pipedProcess < tempFile > utf8Decode).start() (runningCat, runningWc) = runningProcesses _ <- runningCat.waitForExit() wcResult <- runningWc.waitForExit() @@ -164,10 +173,10 @@ class ProcessSpecs extends Specification { def is = s2""" } def pipedProcessStreamInput = { - val source: Stream[IO, Byte] = Stream("This is a test string").through(text.utf8Encode) + val source: Source[ByteString, Any] = Source.single("This is a test string").map(ByteString.apply) val pipedProcess = Process("cat") | Process("wc", List("-w")) val program = for { - runningProcesses <- (pipedProcess < source > text.utf8Decode[IO]).start(blockingExecutionContext) + runningProcesses <- (pipedProcess < source > utf8Decode).start() (runningCat, runningWc) = runningProcesses _ <- runningCat.waitForExit() wcResult <- runningWc.waitForExit() @@ -178,7 +187,7 @@ class ProcessSpecs extends Specification { def is = s2""" def pipedProcessStreamError = { val program = for { - rps <- ((Process("true") | Process("perl", List("-e", """print STDERR "Hello""""))) redirectErrorTo text.utf8Decode[IO]).start(blockingExecutionContext) + rps <- ((Process("true") | Process("perl", List("-e", """print STDERR "Hello""""))) redirectErrorTo utf8Decode).start() (_, running) = rps result <- running.waitForExit() } yield result.fullError @@ -188,7 +197,7 @@ class ProcessSpecs extends Specification { def is = s2""" def simpleProcessPiping = { val program = for { - rps <- (Process("echo", List("This is a test string")) | (Process("wc", List("-w")) > text.utf8Decode[IO])).start(blockingExecutionContext) + rps <- (Process("echo", List("This is a test string")) | (Process("wc", List("-w")) > utf8Decode)).start() (runningEcho, runningWc) = rps _ <- runningEcho.waitForExit() wcResult <- runningWc.waitForExit() @@ -198,28 +207,29 @@ class ProcessSpecs extends Specification { def is = s2""" } def customProcessPiping = { - val customPipe: Pipe[IO, Byte, Byte] = - (s: Stream[IO, Byte]) => s - .through(text.utf8Decode) - .through(text.lines) - .map(_.split(' ').toVector) - .map(v => v.map(_ + " !!!").mkString(" ")) - .intersperse("\n") - .through(text.utf8Encode) + val customPipe = Framing.delimiter( + delimiter = ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true + ).map(_.utf8String) + .map(_.split(' ').toVector) + .map(v => v.map(_ + " !!!").mkString(" ")) + .intersperse("\n") + .map(ByteString.apply) val program = for { - rps <- (Process("echo", List("This is a test string")).via(customPipe).to(Process("wc", List("-w")) > text.utf8Decode[IO])).start(blockingExecutionContext) + rps <- (Process("echo", List("This is a test string")).via(customPipe).to(Process("wc", List("-w")) > utf8Decode)).start() (runningEcho, runningWc) = rps _ <- runningEcho.waitForExit() wcResult <- runningWc.waitForExit() } yield wcResult.fullOutput.trim - program.unsafeRunSync() must beEqualTo("11") + program.unsafeRunSync() must beEqualTo("10") } def simpleProcessPipingHList = { val program = for { - rpHL <- (Process("echo", List("This is a test string")) | (Process("wc", List("-w")) > text.utf8Decode[IO])).startHL(blockingExecutionContext) + rpHL <- (Process("echo", List("This is a test string")) | (Process("wc", List("-w")) > utf8Decode)).startHL() runningEcho = rpHL.head runningWc = rpHL.tail.head _ <- runningEcho.waitForExit() @@ -231,7 +241,7 @@ class ProcessSpecs extends Specification { def is = s2""" def multiProcessPiping = { val program = for { - rps <- (Process("echo", List("cat\ncat\ndog\napple")) | Process("sort") | (Process("uniq", List("-c")) > text.utf8Decode[IO])).start(blockingExecutionContext) + rps <- (Process("echo", List("cat\ncat\ndog\napple")) | Process("sort") | (Process("uniq", List("-c")) > utf8Decode)).start() (runningEcho, runningSort, runningUniq) = rps _ <- runningEcho.waitForExit() _ <- runningSort.waitForExit() @@ -242,9 +252,14 @@ class ProcessSpecs extends Specification { def is = s2""" } def multiProcessPipingWithErrorRedir = { - val errorTarget = ToVector(text.utf8Decode[IO].andThen(text.lines[IO])) + val errorTarget = ToVector( + Framing.delimiter( + delimiter = ByteString("\n"), + maximumFrameLength = 100000, + allowTruncation = true).map(_.utf8String) + ) val program = for { - rps <- ((Process("perl", List("-e", """print STDERR "Hello\nworld"""")) redirectErrorTo errorTarget) | (Process("sort") redirectErrorTo errorTarget) | (Process("uniq", List("-c")) redirectErrorTo errorTarget)).start(blockingExecutionContext) + rps <- ((Process("perl", List("-e", """print STDERR "Hello\nworld"""")) redirectErrorTo errorTarget) | (Process("sort") redirectErrorTo errorTarget) | (Process("uniq", List("-c")) redirectErrorTo errorTarget)).start() (runningPerl, runningSort, runningUniq) = rps perlResult <- runningPerl.waitForExit() sortResult <- runningSort.waitForExit() @@ -256,7 +271,7 @@ class ProcessSpecs extends Specification { def is = s2""" def isAlive = { val program = for { - running <- Process("sleep", List("10")).start(blockingExecutionContext) + running <- Process("sleep", List("10")).start() isAliveBefore <- running.isAlive _ <- running.terminate() isAliveAfter <- running.isAlive @@ -267,7 +282,7 @@ class ProcessSpecs extends Specification { def is = s2""" def terminateSignal = { val program = for { - running <- Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")).start(blockingExecutionContext) + running <- Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")).start() _ <- IO { Thread.sleep(250); } result <- running.terminate() } yield (result.exitCode) @@ -277,7 +292,7 @@ class ProcessSpecs extends Specification { def is = s2""" def killSignal = { val program = for { - running <- Process("perl", List("-e", """$SIG{TERM} = 'IGNORE'; sleep 30; exit 2""")).start(blockingExecutionContext) + running <- Process("perl", List("-e", """$SIG{TERM} = 'IGNORE'; sleep 30; exit 2""")).start() _ <- IO { Thread.sleep(250); } result <- running.kill() } yield (result.exitCode) @@ -287,7 +302,7 @@ class ProcessSpecs extends Specification { def is = s2""" def customEnvVariables = { val program = for { - running <- ((Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox")) > text.utf8Decode[IO]).start(blockingExecutionContext) + running <- ((Process("sh", List("-c", "echo \"Hello $TEST1! I am $TEST2!\"")) `with` ("TEST1" -> "world") `with` ("TEST2" -> "prox")) > utf8Decode).start() result <- running.waitForExit() } yield result.fullOutput @@ -296,7 +311,7 @@ class ProcessSpecs extends Specification { def is = s2""" def outFoldMonoid = { val program = for { - running <- (Process("echo", List("Hello\nworld!")) > text.utf8Decode[IO].andThen(text.lines[IO])).start(blockingExecutionContext) + running <- (Process("echo", List("Hello\nworld!")) > Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String)).start() result <- running.waitForExit() } yield result.fullOutput @@ -307,25 +322,25 @@ class ProcessSpecs extends Specification { def is = s2""" def outLogNonMonoid = { val program = for { - running <- (Process("echo", List("Hello\nworld!")) > text.utf8Decode[IO].andThen(text.lines[IO]).andThen(_.map(s => StringLength(s.length)))).start(blockingExecutionContext) + running <- (Process("echo", List("Hello\nworld!")) > Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String).map(s => StringLength(s.length))).start() result <- running.waitForExit() } yield result.fullOutput - program.unsafeRunSync() must beEqualTo(Vector(StringLength(5), StringLength(6), StringLength(0))) + program.unsafeRunSync() must beEqualTo(Vector(StringLength(5), StringLength(6))) } def outLogMonoid = { val program = for { - running <- (Process("echo", List("Hello\nworld!")) > ToVector(text.utf8Decode[IO].andThen(text.lines[IO]).andThen(_.map(_.length)))).start(blockingExecutionContext) + running <- (Process("echo", List("Hello\nworld!")) > ToVector(Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String).map(_.length))).start() result <- running.waitForExit() } yield result.fullOutput - program.unsafeRunSync() must beEqualTo(Vector(5, 6, 0)) + program.unsafeRunSync() must beEqualTo(Vector(5, 6)) } def outIgnore = { val program = for { - running <- (Process("echo", List("Hello\nworld!")) > Drain(text.utf8Decode[IO].andThen(text.lines[IO]))).start(blockingExecutionContext) + running <- (Process("echo", List("Hello\nworld!")) > Drain(Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String))).start() result <- running.waitForExit() } yield result.fullOutput @@ -335,16 +350,16 @@ class ProcessSpecs extends Specification { def is = s2""" def outCustomFold = { val program = for { running <- (Process("echo", List("Hello\nworld!")) > - Fold(text.utf8Decode[IO].andThen(text.lines[IO]), Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption)).start(blockingExecutionContext) + Fold(Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String), Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption)).start() result <- running.waitForExit() } yield result.fullOutput - program.unsafeRunSync() must beEqualTo(Vector(Some('H'), Some('w'), None)) + program.unsafeRunSync() must beEqualTo(Vector(Some('H'), Some('w'))) } def errFoldMonoid = { val program = for { - running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo text.utf8Decode[IO].andThen(text.lines[IO])).start(blockingExecutionContext) + running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String)).start() result <- running.waitForExit() } yield result.fullError @@ -353,25 +368,25 @@ class ProcessSpecs extends Specification { def is = s2""" def errLogNonMonoid = { val program = for { - running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo text.utf8Decode[IO].andThen(text.lines[IO]).andThen(_.map(s => StringLength(s.length)))).start(blockingExecutionContext) + running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String).map(s => StringLength(s.length))).start() result <- running.waitForExit() } yield result.fullError - program.unsafeRunSync() must beEqualTo(Vector(StringLength(5), StringLength(6), StringLength(0))) + program.unsafeRunSync() must beEqualTo(Vector(StringLength(5), StringLength(6))) } def errLogMonoid = { val program = for { - running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo ToVector(text.utf8Decode[IO].andThen(text.lines[IO]).andThen(_.map(_.length)))).start(blockingExecutionContext) + running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo ToVector(Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String).map(_.length))).start() result <- running.waitForExit() } yield result.fullError - program.unsafeRunSync() must beEqualTo(Vector(5, 6, 0)) + program.unsafeRunSync() must beEqualTo(Vector(5, 6)) } def errIgnore = { val program = for { - running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo Drain(text.utf8Decode[IO].andThen(text.lines[IO]))).start(blockingExecutionContext) + running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo Drain(Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String))).start() result <- running.waitForExit() } yield result.fullError @@ -381,11 +396,11 @@ class ProcessSpecs extends Specification { def is = s2""" def errCustomFold = { val program = for { running <- (Process("perl", List("-e", "print STDERR 'Hello\nworld!\n'")) redirectErrorTo - Fold(text.utf8Decode[IO].andThen(text.lines[IO]), Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption)).start(blockingExecutionContext) + Fold(Framing.delimiter(ByteString("\n"), 10000, true).map(_.utf8String), Vector.empty, (l: Vector[Option[Char]], s: String) => l :+ s.headOption)).start() result <- running.waitForExit() } yield result.fullError - program.unsafeRunSync() must beEqualTo(Vector(Some('H'), Some('w'), None)) + program.unsafeRunSync() must beEqualTo(Vector(Some('H'), Some('w'))) } def doubleOutputRedirectIsIllegal = {