本网页所有文字内容由 imapbox邮箱云存储,邮箱网盘, iurlBox网页地址收藏管理器 下载并得到。
ImapBox 邮箱网盘 工具地址: https://www.imapbox.com/download/ImapBox.5.5.1_Build20141205_CHS_Bit32.exe
PC6下载站地址:PC6下载站分流下载
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox 网页视频 工具地址: https://www.imapbox.com/download/ImovieBox4.7.0_Build20141115_CHS.exe
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。
所以,很多人不倾向于使用多线程编程。取而代之的是,他们使用单线程进程(译者注:只含有一个线程的进程),依赖外部服务(如数据库、队列等)处理所需的并发或异步操作。虽然这种方法在有些情况下是可行的,但还有很多其他情况不能奏效。很多实时系统——例如交易或银行业务应用,或实时游戏——等待一个单线程进程完成就太奢侈了(他们需要立即应答!)。其他的一些对于计算或资源要求非常高的系统,如果在程序中不引入并行机制就会耗时很久(有些情况下可以达到几个小时或数天)。
常用的一种单线程方法(例如,在 Node.js里广泛应用)是使用基于事件的、非阻塞模式(Event-Based, NON-Blocking Paradigm,其中Paradigm也有译作成例)。虽然这种方法可以避免上下文切换、锁和阻塞,的确能提高性能,但还是不能解决并发使用多个处理器(需要启动和协调多个独立的处理器)的问题。
那么,这是不是意味着为了构建一个并发程序,除了深入到线程、锁和竞态条件之外没有别的选择呢?
感谢Akka框架,它为我们提供了一种选择。本教程介绍了Akka的示例,并仔细研究它如何帮助并简化分布式并发应用的实现。
这篇文章介绍了Akka并仔细研究它如何帮助并简化分布式并发应用的实现。
Akka是JVM(JAVA虚拟机,下同)平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用 Scala语言写成,同时提供了Scala和JAVA的开发接口。
Akka处理并发的方法基于 Actor(没有惯用译法,文中使用原词)模型。在基于Actor的系统里,所有的事物都是Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别——特别是和我们的讨论相关的——那就是Actor模型是作为一个并发模型设计和架构的,而面向对象模式则不是。更具体一点,在Scala的Actor系统里,Actor互相交互并共享信息但并不对交互顺序作出预设。Actor之间共享信息和发起任务的机制是消息传递。
创建和调度线程、接收和分发消息以及处理竞态条件和同步的所有复杂性,都委托给框架,框架的处理对应用来说是透明的。
Akka在多个Actor和下面的系统之间建立了一个层次(Layer),这样一来,Actor只需要处理消息就可以了。创建和调度线程、接收和分发消息以及处理竞态条件和同步的所有复杂性,都委托给框架,框架的处理对应用来说是透明的。
Actor严格遵守 响应式声明。响应式应用的目标是通过满足以下一个或多个条件来代替传统的多线程应用:
Actor本质上就是接收消息并采取行动处理消息的对象。它从消息源中解耦出来,只负责正确识别接收到的消息类型,并采取相应的行动。
收到一条消息之后,一个Actor可能会采取以下一个或多个行动:
或者,如果这个Actor认为合适的话,可能会完全忽略这条消息(也就是说,它可能选择不响应)。
为了实现一个Actor,需要继承Akka.Actor.Actor这个Trait(一般译为“特征”,译法有一定争议,文中保留原词)并实现Receive方法。当一个消息发送给Actor时,它的Receive方法会被(Akka)调用。典型的实现包括使用模式匹配(Pattern Matching)来识别消息类型并作出响应,参见下面的Akka示例:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
模式匹配是一种相对优雅的处理消息的技术,相比基于回调的实现,更倾向于产生“更整洁”以及更容易浏览的代码。例如,考虑一个简化版的HTTP请求/响应实现。
首先,我们使用JavaScript中基于回调的方式实现:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
现在,我们把它和基于模式匹配的实现做个比较:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
虽然基于回调的JavaScript代码更紧凑,但确实更难以阅读和浏览。相比而言,基于模式匹配的代码对于需要考虑哪些情况、每种情况都是怎么处理的写法更加清晰。
把一个复杂的问题不断分解成更小规模的子问题通常是一种可靠的解决问题的技术。这个方法对于计算机科学特别有效(和 单一职责原则一致),因为这样容易产生整洁的、模块化的代码,产生的冗余很少甚至没有,而且维护起来相对容易。
在基于Actor的设计里,使用这种技术有助于把Actor的逻辑组织变成一个层级结构,也就是所谓的 Actor系统。Actor系统提供了一个基础框架,通过这个系统Actor之间可以进行交互。
在Akka里面,和Actor通信的唯一方式就是通过ActorRef
。ActorRef
代表Actor的一个引用,可以阻止其他对象直接访问或操作这个Actor的内部信息和状态。消息可以通过一个ActorRef
以下面的语法协议中的一种发送到一个Actor:
–!
(“告知”) —— 发送消息并立即返回
–?
(“请求”) —— 发送消息并返回一个Future对象,代表一个可能的应答
每个Actor都有一个收件箱,用来接收发送过来的消息。收件箱有多种实现方式可以选择,缺省的实现是先进先出(FIFO)队列。
在处理多条消息时,一个Actor包含多个实例变量来保持状态。Akka确保Actor的每个实例都运行在自己的轻量级线程里,并保证每次只处理一条消息。这样一来,开发者不必担心同步或竞态条件,而每个Actor的状态都可以被可靠地保持。
Akka的Actor API中提供了每个Actor执行任务所需要的有用信息:
sender
:当前处理消息的发送者的一个ActorRef
引用context
:Actor运行上下文相关的信息和方法(例如,包括实例化一个新Actor的方法ActorOf
)supervisionStrategy
:定义用来从错误中恢复的策略self
:Actor本身的ActorRef
引用Akka确保Actor的每个实例都运行在自己的轻量级线程里,并保证每次只处理一条消息。这样一来,开发者不必担心同步或竞态条件,而每个Actor的状态都可以被可靠地保持。
为了把这些教程组织起来,让我们来考虑一个简单的例子:统计一个文本文件中单词的数量。
为了达到演示Akka示例的目的,我们把这个问题分解为两个子任务;即(1)统计每行单词数量的“孩子”任务和(2)汇总这些单行单词数量、得到文件里单词总数的“父亲”任务。
父Actor会从文件中装载每一行,然后委托一个子Actor来计算某一行的单词数量。当子Actor完成之后,它会把结果用消息发回给父Actor。父Actor会收到(每一行的)单词数量的消息并维持一个整个文件单词总数的计数器,这个计数器会在完成后返回给调用者。
(注意以下提供的Akka教程的例子只是为了教学目的,所以没有顾及所有的边界条件、性能优化等。同时,完整可编译版本的代码示例可以在这个GIST中找到)
让我们首先看一个子类StringCounterActor
的示例实现:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
这个Actor有一个非常简单的任务:接收ProcessStringMsg
消息(包含一行文本),计算这行文本中单词的数量,并把结果通过一个StringProcessedMsg
消息返回给发送者。请注意我们已经实现了我们的类,使用!
(“告知”)方法发出StringProcessedMsg
消息(发出消息并立即返回)。
好了,现在我们来关注父WordCounterActor
类:
case class StartProcessFileMsg() class WordCounterActor(filename: String) extends Actor { private var running = false private var totalLines = 0 private var linesProcessed = 0 private var result = 0 private var fileSender: Option[ActorRef] = None def receive = { case StartProcessFileMsg() => { if (running) { // println just used for example purposes; // Akka logger should be used instead println("Warning: duplicate start message received") } else { running = true fileSender = Some(sender) // save reference to process invoker import scala.io.Source._ fromFile(filename).getLines.foreach { line => context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) totalLines += 1 } } } case StringProcessedMsg(words) => { result += words linesProcessed += 1 if (linesProcessed == totalLines) { fileSender.map(_ ! result) // provide result to process invoker } } case _ => println("message not recognized!") } }
这里面有很多细节,我们来逐一考察(注意讨论中所引用的行号基于以上代码示例)。
首先,请注意要处理的文件名被传给了WordCounterActor
的构造方法(第3行)。这意味着这个Actor只会用来处理一个单独的文件。这样通过避免重置状态变量(running
,totalLines
,linesProcessed
和result
)也简化了开发者的编码工作,因为这个实例只使用一次(也就是说处理一个单独的文件),然后就丢弃了。
接下来,我们看到WordCounterActor
处理了两种类型的消息:
StartProcessFileMsg
(第12行)
WordCounterActor
的外部Actor接收到的消息WordCounterActor
首先检查它收到的是不是一个重复的请求WordCounterActor
生成一个警告,然后就不做别的事了(第16行)WordCounterActor
在FileSender
实例变量(注意这是一个Option[ActorRef]
而不是一个Option[Actor]
)中保存发送者的一个引用。当处理最终的StringProcessedMsg
(从一个StringCounterActor
子类中接收,如下文所述)时,为了以后的访问和响应,这个ActorRef
是必需的。WordCounterActor
读取文件,当文件中每行都装载之后,就会创建一个StringCounterActor
,需要处理的包含行文本的消息就会传递给它(第21-24行)。StringProcessedMsg
(第27行)
StringCounterActor
处接收到的消息WordCounterActor
会把文件的行计数器增加,如果所有的行都处理完毕(也就是说,当totalLines
和linesProcessed
相等),它会把最终结果发给原来的FileSender
(第28-31行)。再次需要注意的是,在Akka里,Actor之间通信的唯一机制就是消息传递。消息是Actor之间唯一共享的东西,而且因为多个Actor可能会并发访问同样的消息,所以为了避免竞态条件和不可预期的行为,消息的不可变性非常重要。
因为Case Class默认是不可变的并且可以和模式匹配无缝集成,所以用Case Class的形式来传递消息是很常见的。(Scala中的Case Class就是正常的类,唯一不同的是通过模式匹配提供了可以递归分解的机制)。
让我们通过运行整个应用的示例代码来结束这个例子。
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
请注意这里的?
方法是怎样发送一条消息的。用这种方法,调用者可以使用返回的 Future对象,当完成之后可以打印出最后结果并最终通过停掉Actor系统退出程序。
在Actor系统里,每个Actor都是其子孙的监管者。如果Actor处理消息时失败,它就会暂停自己及其子孙并发送一个消息给它的监管者,通常是以异常的形式。
在Akka里面,监管者策略是定义你的系统容错行为的主要并且直接的机制。
在Akka里面,一个监管者对于从子孙传递上来的异常的响应和处理方式称作监管者策略。 监管者策略是定义你的系统容错行为的主要并且直接的机制。
当一条消息指示有一个错误到达了一个监管者,它会采取如下行动之一:
而且,一个Actor可以决定是否把行动应用在失败的子孙上抑或是应用到它的兄弟上。有两种预定义的策略:
OneForOneStrategy
:只把指定行动应用到失败的孩子上AllForOneStrategy
:把指定行动应用到所有子孙上下面是一个使用OneForOneStrategy
的简单例子:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
如果没有指定策略,那么就使用如下默认的策略:
Akka提供的默认策略的实现如下:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka也考虑到对 定制化监管者策略的实现,但正如Akka文档也提出了警告,这么做要小心,因为错误的实现会产生诸如Actor系统被阻塞的问题(也就是说,其中的多个Actor被永久挂起了)。
Akka架构支持 本地透明性,使得Actor完全不知道他们接受的消息是从哪里发出来的。消息的发送者可能驻留在同一个JVM,也有可能是存在于其他的JVM(或者运行在同一个节点,或者运行在不同的节点)。Akka处理这些情况对于Actor(也即对于开发者)来说是完全透明的。唯一需要说明的是跨越节点的消息必须要被序列化。
Akka架构支持本地透明性,使得Actor完全不知道他们接受的消息是从哪里发出来的。
Actor系统设计的初衷,就是不需要任何专门的代码就可以运行在分布式环境中。Akka只需要一个配置文件(Application.Conf),用以说明发送消息到哪些节点。下面是配置文件的一个例子:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
我们已经了解了Akka框架帮助完成并发和高性能的方法。然而,正如这篇教程指出的,为了充分发挥Akka的能力,在设计和实现系统时,有些要点值得考虑:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
Akka用Scala语言写成,简化并为开发高并发、分布式和容错式应用提供了便利,对开发者隐藏了很大程度的复杂性。把Akka用好肯定需要了解比这个教程更多的内容,但是希望这里的介绍和示例能够引起你的注意并继续了解Akka。
Amazon、VMWare和CSC只是现在积极使用Akka的一部分领军企业。可以访问 Akka的官方网站学到更多的知识,并多花点时间研究Akka是否适合你的项目。
原文链接:Akka Tutorial with Code: Concurrency and Fault Tolerance
免费订阅“ImapBox云计算(左)和ImapBox大数据(右)”微信公众号,实时掌握第一手云中消息,了解最新的大数据进展!
ImapBox发布虚拟化、Docker、OpenStack、CloudStack、数据中心等相关云计算资讯, 分享Hadoop、Spark、NoSQL/NewSQL、HBase、Impala、内存计算、流计算、机器学习和智能算法等相关大数据观点,提供云计算和大数据技术、平台、实践和产业信息等服务。
阅读和此文章类似的: 全球云计算