如何开始使用Akka Streams?

阿卡stream库已经有相当丰富的文档 。 然而,对我来说,主要的问题是它提供了太多的材料 – 我感到很多概念,我不得不学习。 这里显示的很多例子都是非常重量级的,不容易被翻译成真实世界的用例,因此非常深奥。 我认为这样做可能会导致太多的细节,而没有解释如何将所有构build模块一起构build,以及如何解决具体问题。

有源,汇,stream,graphics阶段,部分graphics,物化,graphicsDSL和更多,我只是不知道从哪里开始。 快速入门指南是一个开始的地方,但我不明白。 它只是抛出上面提到的概念而不解释它们。 此外,代码示例不能被执行 – 缺less的部分使我几乎不可能跟随文本。

任何人都可以用简单的语言和简单的例子解释概念的来源,汇,stream程,graphics阶段,部分graphics,物化和其他一些事情,而这些简单的例子不能解释每一个细节开始)?

这个答案是基于akka-stream版本2.4.2 。 其他版本的API可能略有不同。 依赖可以被sbt使用 :

 libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2" 

好吧,让我们开始吧。 Akka Streams的API由三种主要types组成。 与反应stream相比 ,这些types更强大,因此更复杂。 假定对于所有的代码示例,已经存在以下定义:

 import scala.concurrent._ import akka._ import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.util._ implicit val system = ActorSystem("TestSystem") implicit val materializer = ActorMaterializer() import system.dispatcher 

types声明需要import语句。 system代表Akka的演员系统, materializer代表stream的评估上下文。 在我们的例子中,我们使用了ActorMaterializer ,这意味着这些stream在actor上面被评估。 这两个值都被标记为implicit ,这使得Scala编译器可以在需要时自动注入这两个依赖项。 我们还导入了system.dispatcher ,这是Futures的执行上下文。

一个新的API

阿卡stream有这些关键属性:

  • 他们实现了Reactive Streams规范 ,其三个主要目标背压,asynchronous和非阻塞边界以及不同实现之间的互操作性也完全适用于Akka Streams。
  • 它们为stream的评估引擎提供抽象,这被称为Materializer
  • 程序被制定为可重复使用的构build块,它们被表示为SourceSourceFlow三种主要types。 构build块形成一个图表,其评估是基于Materializer ,需要明确触发。

下面将深入介绍如何使用这三种主要types。

资源

Source是数据创build者,它用作stream的input源。 每个Source有一个单一的输出通道,没有input通道。 所有的数据都通过输出通道stream向连接到Source任何数据。

资源

图片来自boldradius.com 。

Source可以通过多种方式创build:

 scala> val s = Source.empty s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ... scala> val s = Source.single("single element") s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ... scala> val s = Source(1 to 3) s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> val s = Source(Future("single value from a Future")) s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ... scala> s runForeach println res0: scala.concurrent.Future[akka.Done] = ... single value from a Future 

在上面的例子中,我们给Source提供了有限的数据,这意味着它们将最终终止。 人们不应该忘记,Reactive Streams是默认的惰性和asynchronous的。 这意味着明确地要求对stream进行评估。 在Akka Streams中,这可以通过run*方法来完成。 runForeach与众所周知的foreach函数没有什么不同 – 通过run添加它明确表示我们要求对stream进行评估。 由于有限的数据是无聊的,我们继续无限的:

 scala> val s = Source.repeat(5) s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> s take 3 runForeach println res1: scala.concurrent.Future[akka.Done] = ... 5 5 5 

take方法,我们可以创build一个人工停止点,防止我们无限期地进行评估。 由于actor支持是内置的,所以我们也可以很容易地向发送给actor的消息提供stream:

 def run(actor: ActorRef) = { Future { Thread.sleep(300); actor ! 1 } Future { Thread.sleep(200); actor ! 2 } Future { Thread.sleep(100); actor ! 3 } } val s = Source .actorRef[Int](bufferSize = 0, OverflowStrategy.fail) .mapMaterializedValue(run) scala> s runForeach println res1: scala.concurrent.Future[akka.Done] = ... 3 2 1 

我们可以看到, Futures在不同的线程上是asynchronous执行的,这就解释了结果。 在上面的示例中,传入元素的缓冲区不是必需的,因此,如果使用OverflowStrategy.fail我们可以configurationstream在缓冲区溢出时应该失败。 特别是通过这个actor界面,我们可以通过任何数据源来提供stream。 如果数据是由同一个线程创build的,由另一个进程创build的,另一个进程创build的,或者是通过Internet从远程系统创build的,则无关紧要。

水槽

Sink基本上是一个Source的对面。 它是stream的端点,因此会消耗数据。 一个Sink有一个input通道,没有输出通道。 当我们想要以可重用的方式指定数据收集器的行为而不评估数据stream时,特别需要Sinks器。 已知的run*方法不允许我们使用这些属性,所以最好使用Sink来代替。

水槽

图片来自boldradius.com 。

Sink一个简短的例子:

 scala> val source = Source(1 to 3) source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem")) sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ... scala> val flow = source to sink flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> flow.run() res3: akka.NotUsed = NotUsed sink received: 1 sink received: 2 sink received: 3 

Source连接到Sink器可以使用to方法完成。 它返回一个所谓的RunnableFlow ,就像我们稍后将看到一个Flow的特殊forms – 一个可以通过调用run()方法来执行的stream。

可运行的流程

图片来自boldradius.com 。

当然可以将所有到达水槽的值转发给一个参与者:

 val actor = system.actorOf(Props(new Actor { override def receive = { case msg => println(s"actor received: $msg") } })) scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed") sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ... scala> val runnable = Source(1 to 3) to sink runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> runnable.run() res3: akka.NotUsed = NotUsed actor received: 1 actor received: 2 actor received: 3 actor received: stream completed 

stream

如果您需要Akkastream和现有系统之间的连接,但数据源和接收器非常好,但是不能真正对它们做任何事情。 stream是Akka Streams基本抽象中最后缺失的一部分。 它们作为不同stream之间的连接器,可用于转换其元素。

流

图片来自boldradius.com 。

如果一个Flow接到一个Source一个新的Source是结果。 同样,连接到SinkFlow创build一个新的Sink 。 同时连接一个Source和一个SinkFlow将产生一个RunnableFlow 。 因此,它们位于input和输出通道之间,但只要它们不连接到SourceSink ,它们本身就不对应于其中的一种。

全流

图片来自boldradius.com 。

为了更好地理解Flows ,我们将看一些例子:

 scala> val source = Source(1 to 3) source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ... scala> val sink = Sink.foreach[Int](println) sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ... scala> val invert = Flow[Int].map(elem => elem * -1) invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ... scala> val doubler = Flow[Int].map(elem => elem * 2) doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ... scala> val runnable = source via invert via doubler to sink runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> runnable.run() res10: akka.NotUsed = NotUsed -2 -4 -6 

通过via方法,我们可以连接SourceFlow 。 我们需要指定inputtypes,因为编译器无法为我们推断它。 正如我们在这个简单的例子中已经看到的,streaminvertdouble invert完全独立于任何数据生产者和消费者。 他们只转换数据并将其转发到输出通道。 这意味着我们可以在多个stream之间重用stream:

 scala> val s1 = Source(1 to 3) via invert to sink s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> val s2 = Source(-3 to -1) via invert to sink s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ... scala> s1.run() res10: akka.NotUsed = NotUsed -1 -2 -3 scala> s2.run() res11: akka.NotUsed = NotUsed 3 2 1 

s1s2代表全新的stream – 它们不通过它们的构件块共享任何数据。

无界的数据stream

在我们继续之前,我们应该首先回顾一下Reactive Streams的一些关键方面。 无限数量的元素可以在任何时候到达,并且可以将stream以不同的状态。 除了通常状态的可运行stream之外,stream可以通过错误或通过表示没有更多数据将到达的信号而停止。 通过在时间线上标记事件,可以以graphics的方式对stream进行build模,就像这样:

显示流是按时排序的正在进行的事件序列

图片来自你已经失踪的反应式编程介绍 。

在上一节的例子中,我们已经看到了可运行的stream程。 每当一个stream可以实际化时,我们就得到一个RunnableGraph ,这意味着一个Sink连接到一个Source 。 到目前为止,我们总是物化为价值Unit ,这可以从以下types中看出:

 val source: Source[Int, NotUsed] = Source(1 to 3) val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println) val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x) 

对于SourceSink第二个types参数,对于Flow第三个types参数表示物化值。 在整个这个答案中,物化的全部意义不应被解释。 但是,关于具体化的更多细节可以在官方文档中find。 现在我们唯一需要知道的是物化值是我们在运行stream时所得到的。 由于我们目前只对副作用感兴趣,所以我们得到了Unit的物化价值。 这个例外是一个汇的实现,导致了一个Future 。 它给了我们一个Future ,因为这个值可以表示何时连接到接收器的stream已经结束。 到目前为止,前面的代码示例很好地解释了这个概念,但是它们也很无聊,因为我们只处理有限的stream或者非常简单的无限stream。 为了使它更有趣,下面将解释一个完整的asynchronous和无界stream。

ClickStream示例

举个例子,我们想要一个捕捉点击事件的stream。 为了使它更具挑战性,比方说,我们也希望将发生在短时间内的点击事件分组。 这样,我们可以轻松发现双倍,三倍或十倍的点击次数。 而且,我们希望过滤掉所有的单击。 深吸一口气,想象一下你将如何以一种迫切的方式来解决这个问题。 我敢打赌,没有人能够实施一次正确的解决scheme。 这个问题在反应方面是微不足道的。 事实上,这个解决scheme非常简单直接,我们甚至可以用直接描述代码行为的图表来expression它:

点击流示例的逻辑

图片来自你已经失踪的反应式编程介绍 。

灰色框是描述一个stream如何转换成另一个stream的函数。 使用throttlefunction我们在250毫秒内累计点击数, mapfilterfunction应该是不言自明的。 彩色的球体代表一个事件,箭头描述了它们如何stream经我们的function。 在后面的处理步骤中,我们将越来越less的元素stream过我们的stream,因为我们将它们组合在一起并将其过滤掉。 这个图像的代码看起来像这样:

 val multiClickStream = clickStream .throttle(250.millis) .map(clickEvents => clickEvents.length) .filter(numberOfClicks => numberOfClicks >= 2) 

整个逻辑只能用四行代码表示! 在斯卡拉,我们可以写得更短:

 val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2) 

clickStream的定义稍微复杂一些,但是仅仅是这种情况,因为示例程序运行在JVM上,在那里捕获点击事件是不容易的。 另一个复杂因素是Akka默认不提供throttlefunction。 相反,我们必须自己写。 由于这个函数(和mapfilter函数一样)可以在不同的用例中重用,所以我不会把这些行计算到我们实现这个逻辑所需要的行数。 然而,在命令式语言中,正常情况下逻辑不能被轻易地重复使用,而且不同的逻辑步骤全部在一个地方发生,而不是按顺序应用,这意味着我们可能会用限制逻辑来使我们的代码变形。 完整的代码示例可作为要点提供,此处不再赘述。

SimpleWebServer示例

应该讨论的是另一个例子。 虽然点击stream是一个很好的例子,让阿卡stream处理一个真实世界的例子,它缺乏动力显示并行执行的力量。 下一个例子将代表一个可以并行处理多个请求的小型Web服务器。 networking服务器应能够接受传入的连接并从它们接收表示可打印的ASCII标志的字节序列。 这些字节序列或string应该在所有换行字符处拆分成较小的部分。 之后,服务器将用每个分割线响应客户端。 或者,它可以做些别的事情,给出一个特殊的答案标记,但是我们希望在这个例子中保持简单,所以不要引入任何奇特的特性。 请记住,服务器需要能够同时处理多个请求,这基本上意味着不允许请求阻止任何其他请求进一步执行。 解决所有这些要求可能是一个必不可less的方式 – 与Akka Streams,但我们不应该需要超过几行解决任何这些。 首先,让我们来看看服务器本身:

服务器

基本上,只有三个主要的组成部分。 第一个需要接受传入的连接。 第二个需要处理传入的请求,第三个需要发送响应。 实现所有这三个构build块只比实现点击stream更复杂一点:

 def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = { import system.dispatcher val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] = Sink.foreach[Tcp.IncomingConnection] { conn => println(s"Incoming connection from: ${conn.remoteAddress}") conn.handleWith(serverLogic) } val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = Tcp().bind(address, port) val binding: Future[Tcp.ServerBinding] = incomingCnnections.to(connectionHandler).run() binding onComplete { case Success(b) => println(s"Server started, listening on: ${b.localAddress}") case Failure(e) => println(s"Server could not be bound to $address:$port: ${e.getMessage}") } } 

函数mkServer (除了服务器的地址和端口)还有一个mkServer系统和一个实现者作为隐式参数。 服务器的控制stream由binding来表示,该binding需要传入连接的来源并将它们转发到传入连接的接收器。 在我们的接收器connectionHandler内部,我们通过stream程serverLogic处理每个连接,这将在后面介绍。 binding返回一个Future ,在服务器启动或启动失败时完成,当端口已被另一个进程占用时可能会发生这种情况。 但是,代码并不能完全反映graphics,因为我们无法看到处理响应的构build块。 这是因为连接本身已经提供了这个逻辑。 这是一个双向stream动,而不仅仅是一个单向stream动,就像我们在前面的例子中看到的那样。 就实际情况而言,这种复杂的stream动在这里不作解释。 官方文件有大量的材料来覆盖更复杂的stream程图。 现在知道Tcp.IncomingConnection代表一个知道如何接收请求以及如何发送响应的连接就足够了。 仍然缺less的部分是serverLogic构build块。 它可以看起来像这样:

服务器逻辑

再一次,我们能够将逻辑分成几个简单的构build模块,这些模块一起构成了我们程序的stream程。 首先,我们要将我们的字节序列分成几行,每当我们find一个换行符时,我们都必须这样做。 之后,每行的字节需要转换为string,因为使用原始字节是麻烦的。 总的来说,我们可以收到一个复杂协议的二进制stream,这将使得处理传入的原始数据极具挑战性。 一旦我们有一个可读的string,我们可以创build一个答案。 为了简单起见,答案可以是我们的任何事情。 最后,我们必须将我们的答案转换成可以通过线路发送的一系列字节。 整个逻辑的代码可能如下所示:

 val serverLogic: Flow[ByteString, ByteString, Unit] = { val delimiter = Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true) val receiver = Flow[ByteString].map { bytes => val message = bytes.utf8String println(s"Server received: $message") message } val responder = Flow[String].map { message => val answer = s"Server hereby responds to message: $message\n" ByteString(answer) } Flow[ByteString] .via(delimiter) .via(receiver) .via(responder) } 

我们已经知道serverLogic是一个需要一个ByteString并且必须产生一个ByteString 。 使用delimiter我们可以将ByteString分成较小的部分 – 在我们的情况下,只要发生换行符就需要发生。 receiver是将所有分割字节序列都转换为string的stream程。 这当然是一个危险的转换,因为只有可打印的ASCII字符应该被转换为string,但是对于我们的需要来说,这已经足够了。 responder是最后一个组件,负责创build答案并将答案转换回字节序列。 与graphics相反,我们并没有将这个最后的组件分成两部分,因为逻辑是微不足道的。 最后,我们通过via函数连接所有的stream程。 在这一点上,人们可能会问我们是否照顾到开始提到的多用户财产。 事实上,我们确实做到了,尽pipe它可能不是很明显。 通过查看这个graphics应该更清楚:

服务器和服务器逻辑相结合

serverLogic组件不过是一个包含更小stream量的stream。 这个组件接受一个input,这是一个请求,并产生一个输出,这就是响应。 由于stream可以被多次构build,并且彼此独立工作,所以我们通过嵌套我们的多用户属性来实现。 每个请求都在自己的请求中处理,因此一个短的运行请求可以超过以前启动的长时间运行的请求。 如果您想知道,以前显示的serverLogic的定义当然可以通过内联的大部分内部定义来缩短:

 val serverLogic = Flow[ByteString] .via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .map(msg => s"Server hereby responds to message: $msg\n") .map(ByteString(_)) 

Web服务器的testing可能如下所示:

 $ # Client $ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666 Server hereby responds to message: Hello World Server hereby responds to message: How are you? 

为了使上面的代码示例正常工作,我们首先需要启动由startServer脚本描述的服务器:

 $ # Server $ ./startServer 127.0.0.1 6666 [DEBUG] Server started, listening on: /127.0.0.1:6666 [DEBUG] Incoming connection from: /127.0.0.1:37972 [DEBUG] Server received: Hello World [DEBUG] Server received: How are you? 

这个简单的TCP服务器的完整的代码示例可以在这里find。 我们不仅可以用阿卡stream写一个服务器,而且还可以写客户端。 它可能看起来像这样:

 val connection = Tcp().outgoingConnection(address, port) val flow = Flow[ByteString] .via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .map(println) .map(_ ⇒ StdIn.readLine("> ")) .map(_+"\n") .map(ByteString(_)) connection.join(flow).run() 

完整的代码TCP客户端可以在这里find。 代码看起来很相似,但与服务器相反,我们不必pipe理传入的连接了。

复杂的graphics

在前面的章节中,我们已经看到了如何用stream构造简单的程序。 但是,实际上仅仅依靠已经内置的函数来构造更复杂的stream通常是不够的。 如果我们希望能够将Akka Streams用于任意程序,我们需要知道如何构build自己的自定义控制结构和可组合的stream程,从而使我们能够处理我们应用程序的复杂性。 好消息是,Akka Streams旨在根据用户的需求进行扩展,为了简单介绍Akka Streams中更为复杂的部分,我们在客户端/服务器示例中添加了更多function。

我们还不能做的一件事是closures连接。 在这一点上,它开始变得复杂一些了,因为我们目前看到的streamAPI不允许我们在任意点停止stream。 但是, GraphStage抽象可用于创build任意数量的input或输出端口的任意graphics处理阶段。 我们先来看看服务器端,在这里我们引入一个叫做closeConnection的新组件:

 val closeConnection = new GraphStage[FlowShape[String, String]] { val in = Inlet[String]("closeConnection.in") val out = Outlet[String]("closeConnection.out") override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush() = grab(in) match { case "q" ⇒ push(out, "BYE") completeStage() case msg ⇒ push(out, s"Server hereby responds to message: $msg\n") } }) setHandler(out, new OutHandler { override def onPull() = pull(in) }) } } 

这个API看起来比streamAPI更麻烦。 难怪,我们必须在这里做很多必要的步骤。 作为交换,我们对stream的行为有更多的控制权。 在上面的例子中,我们只指定一个input和一个输出端口,并通过覆盖shape值使它们可用于系统。 此外,我们定义了一个所谓的InHandler和一个OutHandler ,它们依次负责接收和发送元素。 如果仔细查看完整的点击stream示例,则应该已经识别这些组件。 在InHandler我们抓取一个元素,如果它是一个带有单个字符'q'的string,我们要closures这个stream。 为了让客户有机会发现stream将很快closures,我们发出string"BYE" ,然后我们立即closures舞台。 closeConnection组件可以通过via方法与stream相结合,这是在关于stream的部分中介绍的。

除了能够closures连接之外,如果我们可以向新创build的连接显示一条欢迎消息,那也不错。 为了做到这一点,我们再一次要进一步:

 def serverLogic (conn: Tcp.IncomingConnection) (implicit system: ActorSystem) : Flow[ByteString, ByteString, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n")) val logic = b.add(internalLogic) val concat = b.add(Concat[ByteString]()) welcome ~> concat.in(0) logic.outlet ~> concat.in(1) FlowShape(logic.in, concat.out) }) 

serverLogic函数现在将传入连接作为参数。 在它的内部,我们使用了一个DSL,允许我们描述复杂的stream行为。 随着welcome我们创build一个只能发出一个元素的stream – 欢迎消息。 logic就是上一节描述的serverLogic 。 唯一显着的区别是我们增加了closeConnection 。 现在实际上是DSL的有趣的部分。 GraphDSL.create函数使得构build器b可用,其用于将stream表示为graphics。 使用~>function可以将input和输出端口相互连接。 示例中使用的Concat组件可以连接元素,并在这里用于在来自internalLogic的其他元素前加上欢迎消息。 在最后一行中,我们只使服务器逻辑的input端口和连接stream的输出端口可用,因为所有其他端口应该保持serverLogic组件的实现细节。 有关Akka Streams图DSL的深入介绍,请访问官方文档中的相应部分。 可以在这里find复杂的TCP服务器和可以与之通信的客户端的完整代码示例。 每当你从客户端打开一个新的连接,你应该看到一个欢迎信息,并通过在客户端键入"q"你应该看到一条消息,告诉你,连接已被取消。

还有一些话题没有被这个答案所覆盖。 特别是物化可能吓倒一个读者或另一个读者,但是我确信在这里覆盖的材料每个人都应该能够自己下一步。 如前所述, 官方文件是继续学习Akka Streams的好地方。