我如何在Scala中执行多个任务?
我有50,000个任务,想用10个线程执行它们。 在Java中,我应该创buildExecuters.threadPool(10)并传递runnable,然后等待处理所有。 据我所知,Scala对于这个任务特别有用,但我无法在文档中find解决scheme。
斯卡拉2.9.3和更高版本
最简单的方法是使用scala.concurrent.Future
类和相关的基础结构。 scala.concurrent.future
方法asynchronous评估传递给它的块,并立即返回表示asynchronous计算的Future[A]
。 期货可以通过多种非封锁方式进行操纵,包括制图,平铺,过滤,错误恢复等。
例如,下面是一个创build10个任务的示例,其中每个任务都会hibernate一段时间,然后返回传递给它的值的平方。
import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future { println("Executing task " + i) Thread.sleep(i * 1000L) i * i } val aggregated: Future[Seq[Int]] = Future.sequence(tasks) val squares: Seq[Int] = Await.result(aggregated, 15.seconds) println("Squares: " + squares)
在这个例子中,我们首先创build一系列单独的asynchronous任务,完成时提供一个int。 然后,我们使用Future.sequence
将这些asynchronous任务合并到一个asynchronous任务中 – 交换Future
的位置和Seq
的types。 最后,我们在等待结果的同时阻塞当前线程长达15秒。 在这个例子中,我们使用了由fork / join线程池支持的全局执行上下文。 对于非平凡的例子,你可能会想使用一个特定于应用程序的ExecutionContext
。
一般来说,尽可能避免阻塞。 Future
类还有其他一些组合器,可以帮助编程为asynchronous风格,包括onSuccess
, onFailure
和onComplete
。
此外,请考虑调查Akka库,该库为Scala和Java提供基于angular色的并发性,并与scala.concurrent
互操作。
斯卡拉2.9.2和之前
这个最简单的方法是使用Scala的Future类,它是Actors框架的一个子组件。 scala.actors.Futures.future方法为传递给它的块创build一个Future。 然后你可以使用scala.actors.Futures.awaitAll来等待所有的任务完成。
例如,下面是一个创build10个任务的示例,其中每个任务都会hibernate一段时间,然后返回传递给它的值的平方。
import scala.actors.Futures._ val tasks = for (i <- 1 to 10) yield future { println("Executing task " + i) Thread.sleep(i * 1000L) i * i } val squares = awaitAll(20000L, tasks: _*) println("Squares: " + squares)
你想看看斯卡拉演员图书馆或阿卡。 Akka有更清晰的语法,但是两者都可以做到。
所以这听起来像你需要创build一个知道如何处理你的任务的演员池。 一个Actor可以基本上是任何具有接收方法的类 – 来自Akka教程( http://doc.akkasource.org/tutorial-chat-server-scala ):
class MyActor extends Actor { def receive = { case "test" => println("received test") case _ => println("received unknown message") }} val myActor = Actor.actorOf[MyActor] myActor.start
您将需要创build一个actor实例池,并将其作为消息发送给它们。 这里有一篇关于Akka演员合集的文章,可能会有所帮助: http : //vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/
在你的情况下,每个任务一个actor可能是合适的(与线程相比,参与者非常轻量级,所以你可以在单个VM中拥有很多),或者你可能需要更复杂的负载平衡。
编辑:使用上面的示例actor,发送消息就像这样简单:
myActor ! "test"
演员然后将输出“接收testing”到标准输出。
消息可以是任何types的,当与Scala的模式匹配结合使用时,可以为构build灵活的并发应用程序提供强大的模式。
一般来说,Akka演员将在线程共享方面“做正确的事情”,对于OP的需求,我想像默认是好的。 但是如果你需要的话,你可以设置演员应该使用的调度器到几种types之一:
* Thread-based * Event-based * Work-stealing * HawtDispatch-based event-driven
为演员设置一个调度员是微不足道的:
class MyActor extends Actor { self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch") .withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(10) .setMaxPoolSize(10) .setKeepAliveTimeInMillis(10000) .build }
见http://doc.akkasource.org/dispatchers-scala
通过这种方式,可以限制线程池的大小,但是原来的用例可能会使用默认的调度器来满足50K个Akka actor实例,并且它会很好地并行化。
这实际上只是抓住了阿卡可以做的事情的表面。 它带来了很多Erlang提供给Scala语言的东西。 参与者可以监视其他参与者并重新启动它们,创build自我修复应用程序。 Akka还提供软件事务内存和许多其他function。 这可以说是Scala的“杀手级应用”或“杀手级架构”。
如果你想“执行10个线程”,然后使用线程。 Scala的演员模型,通常人们在说Scala时说的是并发的好处, 隐藏了这些细节,所以你不会看到它们。
使用演员或者所有你所拥有的未来都是简单的计算,你只需要创build50000个并让它们运行。 你可能会遇到问题,但是性质不同。
这里有另一个类似于mpilquist响应的答案,但是没有弃用的API,并且通过一个自定义的ExecutionContext包含了线程设置:
import java.util.concurrent.Executors import scala.concurrent.{ExecutionContext, Await, Future} import scala.concurrent.duration._ val numJobs = 50000 var numThreads = 10 // customize the execution context to use the specified number of threads implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads)) // define the tasks val tasks = for (i <- 1 to numJobs) yield Future { // do something more fancy here i } // aggregate and wait for final result val aggregated = Future.sequence(tasks) val oneToNSum = Await.result(aggregated, 15.seconds).sum