作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
迭戈一直是意大利电信巨头Italtel等公司的高级自由工程师. 他还共同创立了一家基于网络的CRM业务.
写作 concurrent programs 是很困难的. Having to deal with threads, 锁, race conditions, 等等都非常容易出错,并可能导致代码难以阅读, 测试, maintain.
因此,许多人倾向于完全避免多线程. Instead, 它们专门使用单线程进程, 依赖于外部服务(如数据库), 队列, 等.)来处理任何需要的并发或异步操作. 虽然这种方法在某些情况下是一种合法的替代方法, 在许多情况下,它根本不是一个可行的选择. 许多实时系统-例如交易或银行应用程序, 或者实时游戏——没有等待单线程进程完成的奢侈(他们现在就需要答案)!). 其他系统是计算密集型或资源密集型的,如果没有在代码中引入并行化,它们将花费大量的时间(在某些情况下是数小时甚至数天)来运行.
一种相当常见的单线程方法(广泛用于 节点.js 例如,World)是使用基于事件的非阻塞范式. 这避免了上下文切换,从而提高了性能, 锁, blocking, 它仍然没有解决并发使用多个处理器的问题(这样做需要启动), coordinating between, multiple independent processes).
所以这是否意味着你别无选择,只能深入到线程的内部, 锁, 和竞争条件,以便构建并发应用程序?
感谢Akka框架,答案是否定的. 本教程介绍了Akka示例,并探讨了它促进和简化并发实现的方式, distributed applications.
Akka 用于构建的工具箱和运行时是高度并发的吗, distributed, 以及JVM上的容错应用程序. Akka is written in Scala,同时为Scala和Java提供了语言绑定.
Akka处理并发性的方法基于 演员 Model. In an 演员-based 系统, everything is an 演员, 就像在面向对象设计中,一切都是对象一样. A key difference, 但是,与我们的讨论特别相关的是,演员模型是专门设计和架构为并发模型的,而面向对象模型则不是. More specifically, in a Scala 演员 系统, 参与者交互并共享信息, 没有任何预设的顺序. 参与者相互共享信息的机制, t问 one another, is 消息 passing.
Akka在参与者和底层系统之间创建了一个层,这样参与者只需要处理消息. 创建和调度线程的所有复杂性, 接收和发送消息, 处理竞争条件和同步, 是否降级到框架处理透明.
Akka strictly adheres to the The Reactive Manifesto. 响应式应用旨在用满足以下一个或多个要求的架构取代传统的多线程应用:
参与者本质上只不过是接收消息并采取操作来处理消息的对象. 它与消息源解耦,它唯一的职责是正确识别它所接收到的消息类型并采取相应的操作.
在收到消息后,参与者可以采取以下一个或多个操作:
或者,参与者可以选择完全忽略该消息.e.(它可以选择不作为)如果它认为这样做是合适的.
要实现演员,必须扩展akka.演员.演员特征并实现接收方法. 当消息被发送到参与者时,参与者的receive方法将被调用(由Akka调用). 它的典型实现由模式匹配组成, 如下面的Akka示例所示, 要识别消息类型并作出相应的反应:
import akka.演员.演员
import akka.演员.道具
import akka.事件.Logging
class My演员 extends 演员 {
def receive = {
case value: String => doSomething(value)
case _ => println("received unknown 消息")
}
}
模式匹配是处理消息的一种相对优雅的技术, 比起基于回调的类似实现,哪一种倾向于生成“更干净”且更易于导航的代码. 例如,考虑一个简单的HTTP请求/响应实现.
首先,让我们在JavaScript中使用基于回调的范例来实现它:
route(url, function(request){
var query = buildQuery(request);
dbCall(查询功能(dbResponse) {
var wsRequest = buildWebServiceRequest(dbResponse);
wcall (wsRequest, function(wsResponse)) {
发送Reply(wsResponse);
});
});
});
现在让我们将其与基于模式匹配的实现进行比较:
msg match {
case HttpRequest(request) => {
val query = buildQuery(request)
dbCall(query)
}
case DbResponse(dbResponse) => {
var wsRequest = buildWebServiceRequest(dbResponse);
wsCall(dbResponse)
}
case WsResponse(wsResponse) => 发送Reply(wsResponse)
}
虽然基于回调的JavaScript代码确实很紧凑, 当然,阅读和导航难度更大. In comparison, 基于模式匹配的代码使正在考虑的情况以及如何处理每种情况更加明显.
把一个复杂的问题递归地分解成更小的子问题,通常是一种可靠的问题解决技术. 这种方法在计算机科学中特别有益(与 单一责任原则), as it tends to yield clean, modularized code, with little or no redundancy, 这相对容易维护.
In an 演员-based design, 使用这种技术可以方便地将参与者的逻辑组织成一个层次结构,称为 演员 System. 参与者系统提供了参与者相互交互的基础设施.
在Akka中,与演员交流的唯一方式是通过an 演员Ref
. An 演员Ref
表示对参与者的引用,该引用阻止其他对象直接访问或操纵该参与者的内部和状态. 消息可以通过控件发送给参与者 演员Ref
使用以下语法协议之一:
!
(" tell ")——发送消息并立即返回?
(" 问 ")——发送消息并返回 Future representing a possible reply每个参与者都有一个邮箱,将传入的消息传递到该邮箱. 有多种邮箱实现可供选择, 默认实现是FIFO.
参与者包含许多实例变量,以便在处理多个消息时保持状态. Akka确保参与者的每个实例在自己的轻量级线程中运行,并且每次处理一个消息. In this way, 每个参与者的状态都可以可靠地维护,而开发人员无需显式地担心同步或竞争条件.
通过Akka 演员 API,每个演员都提供了以下有用的信息来执行其任务:
发送方
: an 演员Ref
到当前正在处理的消息的发送方context
:与参与者运行的上下文相关的信息和方法(包括, for example, an 演员Of
方法(用于实例化新参与者)supervisionStrategy
:定义用于从错误中恢复的策略自我
: 演员Ref
for the 演员 it自我将这些教程联系在一起, 让我们考虑一个计算文本文件中单词数量的简单示例.
对于我们的Akka例子来说, we’ll decompose the problem into two subt问s; namely, (1)计算单行单词数的“子”任务;(2)计算每行单词数之和的“父”任务,得到文件中的单词总数.
父角色将从文件中加载每行,然后将计算该行字数的任务委托给子角色. 当子进程完成后,它将把结果发送回父进程. 父进程将接收带有单词计数的消息(每行),并为整个文件中的单词总数保留一个计数器, 然后在完成后返回给调用者.
(请注意,下面提供的Akka教程代码示例仅用于教学,因此不一定涉及所有边缘条件, performance optimizations, so on. 此外,本文还提供了下面所示代码示例的完整可编译版本 要点.)
让我们首先看一下子节点的样例实现 StringCounter演员
类:
case类ProcessStringMsg(string: string)
case类StringProcessedMsg(字:整型)
类StringCounter演员扩展演员 {
def receive = {
case ProcessStringMsg(string) => {
val wordsInLine = string.split(" ").长度
发送方 ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: 消息 not recognized")
}
}
这个演员有一个非常简单的任务:消费 ProcessStringMsg
消息(包含一行文本), 计算指定行上的单词数, 并将结果通过a返回给发送方 StringProcessedMsg
消息. 请注意,我们已经将类实现为使用 !
(“tell”) method to 发送 the StringProcessedMsg
消息 (i.e.,发送消息并立即返回).
好了,现在让我们把注意力转向父母 WordCounter演员
类:
1. case类StartProcessFileMsg()
2.
3. 类WordCounter演员(文件名:String)扩展演员 {
4.
5. private var running = false
6. private var totalLines = 0
7. private var linesProcessed = 0
8. private var 结果 = 0
9. private var fileSender: Option[演员Ref] = None
10.
11. def receive = {
12. case StartProcessFileMsg() => {
13. if (running) {
14. // println仅用于示例目的;
15. //应该使用Akka记录器
16. println("警告:收到重复的启动消息")
17. } else {
18. running = true
19. fileSender = Some(发送方) //保存对进程调用者的引用
20. import scala.io.Source._
21. fromFile(filename).getLines.foreach { line =>
22. context.演员Of(道具(StringCounter演员)) ! ProcessStringMsg(line)
23. totalLines += 1
24. }
25. }
26. }
27. case StringProcessedMsg(words) => {
28. 结果 += words
29. linesProcessed += 1
30. if (linesProcessed == totalLines) {
31. fileSender.地图(_ ! Result) //向进程调用者提供结果
32. }
33. }
34. case _ => println("消息 not recognized!")
35. }
36. }
这里发生了很多事情,所以让我们更详细地检查每一个 (请注意,下面讨论中引用的行号是基于上面的代码示例的)…
首先,注意要处理的文件的名称被传递给 WordCounter演员
constructor (line 3). 这表明参与者仅用于处理单个文件. 这也简化了开发人员的编码工作, 通过避免重置状态变量(running
, totalLines
, linesProcessed
, 结果
),因为实例只被使用一次(i.e.(处理单个文件),然后丢弃.
Next, observe that the WordCounter演员
h和les two types of 消息s:
StartProcessFileMsg
(line 12)
WordCounter演员
.WordCounter演员
首先检查它是否没有接收到冗余请求.WordCounter演员
生成一个警告,不再执行任何操作(第16行).WordCounter演员
控件中存储对发送方的引用 fileSender
实例变量(注意,这是一个 Option[演员Ref]
rather than an Option[演员]
- see line 9). 这 演员Ref
是否需要在处理最终结果时访问和响应它 StringProcessedMsg
(which is received from a StringCounter演员
child, as described below).WordCounter演员
然后读取文件,在加载文件中的每一行时,a StringCounter演员
创建子节点,并将包含要处理的行的消息传递给它(第21-24行)。.StringProcessedMsg
(line 27)
StringCounter演员
当它完成处理时,分配给它的行.WordCounter演员
增加文件的行计数器,如果文件中的所有行都已处理(i.e.,当 totalLines
和 linesProcessed
是相等的),它将最终结果发送给原始 fileSender
(lines 28-31).再次注意,在Akka中,参与者间通信的唯一机制是消息传递. 消息是参与者之间唯一共享的东西, 因为参与者可能并发地访问相同的消息, 对它们来说,不可变是很重要的, 以避免竞争条件和意外行为.
因此,通常以case类的形式传递消息,因为它们在默认情况下是不可变的,并且由于它们与模式匹配的无缝集成.
让我们用运行整个应用程序的代码示例来结束这个示例.
object Sample extends App {
import akka.跑龙套.超时
import scala.concurrent.duration._
import akka.模式.问
import akka.dispatch.ExecutionContexts._
implicit val ec = global
重载def main(args: Array[String]) {
val 系统 = 演员System(" 系统 ")
val 演员 = 系统.演员Of(道具(新WordCounter演员 (args (0))))
隐val timeout =超时(25秒)
val Future = 演员 ? StartProcessFileMsg()
Future.map { 结果 =>
println("总字数" +结果)
系统.shutdown
}
}
}
Notice how this time the ?
方法用于发送消息. 这样,调用者就可以使用返回的 Future 在可用时打印最终结果,并通过关闭演员System退出程序.
在演员系统中,每个演员都是其子系统的监督者. 如果参与者未能处理消息, 它将自己及其所有子进程挂起,并发送一条消息, 通常以异常的形式出现, to its supervisor.
In Akka, 管理器对从子进程渗透到它的异常作出反应和处理的方式称为管理器策略. Supervisor strategies 定义系统容错行为的主要和直接的机制是什么.
当表示失败的消息到达主管时,它可以采取以下操作之一:
Moreover, 演员可以决定只对失败的子节点或它的兄弟节点应用操作. 这里有两个预先定义的策略:
OneForOneStrategy
:只对失败的子节点应用指定的操作AllForOneStrategy
:将指定的操作应用于它的所有子操作下面是一个简单的例子 OneForOneStrategy
:
import akka.演员.OneForOneStrategy
import akka.演员.SupervisorStrategy._
import scala.concurrent.duration._
overoverval overoverval superorstrategy =
OneForOneStrategy() {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
如果不指定策略,则采用如下默认策略:
akka提供的这个默认策略的实现如下:
defaultStrategy: superorstrategy = {
def defaultDecider: Decider = {
case _: 演员InitializationException⇒停止
case _: 演员killledexception⇒停止
case _: Exception:重启
}
OneForOneStrategy () (defaultDecider)
}
Akka允许实现 custom supervisor strategies, 但正如阿卡文件所警告的那样, 这样做要谨慎,因为不正确的实现可能会导致诸如阻塞参与者系统(i.e. permanently suspended 演员s).
The Akka architecture supports location transparency,使参与者完全不知道他们收到的消息来自哪里. 消息的发送方可能与参与者位于相同的JVM中,也可能位于单独的JVM中(运行在相同节点或不同节点上)。. Akka允许以一种对参与者(因此对开发人员)完全透明的方式处理这些情况。. 唯一需要注意的是,跨多个节点发送的消息必须是可序列化的.
参与者系统被设计为在分布式环境中运行,而不需要任何专门的代码. Akka只需要配置文件(application.相依
),它指定要发送消息的节点. 下面是一个简单的配置文件示例:
akka {
演员{
provider = "akka.远程.Remote演员RefProvider"
}
远程 {
transport = "akka.远程.网状的.NettyRemoteTransport"
网状的{
hostname = "127.0.0.1"
port = 2552
}
}
}
我们已经看到Akka框架是如何帮助实现并发性和高性能的. However, as this tutorial pointed out, 为了充分利用Akka的力量,在设计和实施系统时要记住以下几点:
演员s should h和le 事件s (i.e., 异步处理消息,不应该阻塞, 否则将发生上下文切换,从而对性能产生不利影响. 具体来说,最好执行阻塞操作(IO等).) in a Future so as not to block the 演员; i.e.:
case evt => blockingCall() // BAD
case evt => Future {
blockingCall() //好
}
Akka, written in Scala, 简化和促进高度并发的开发, distributed, 以及容错应用程序, 对开发人员隐藏了许多复杂性. 完全公正地对待Akka需要的远不止这一篇教程, 但希望这个介绍和它的例子足够吸引你,让你想要读更多.
亚马逊、VMWare和CSC只是积极使用Akka的领先公司的几个例子. Visit the official Akka website 以了解更多信息,并探索Akka是否也可以成为您项目的正确答案.
Located in Prague, Czech Republic
Member since March 5, 2014
迭戈一直是意大利电信巨头Italtel等公司的高级自由工程师. 他还共同创立了一家基于网络的CRM业务.
17
世界级的文章,每周发一次.
世界级的文章,每周发一次.
Join the Toptal® community.