For a durable elastic scaling system Akka is hard to beat. However some of the advanced configuration is not easy to find end to end examples of.
Things change with versions. This is relevant to
Akka 2.4
SBT 0.13.12
Scala 2.10.6
One decision is what to configure in code and what to configure in the application.conf file.
Do you have a muti-actorsystem build process? If so you need each actor system to have a namespace in the config.
If you have a single system there is no need for the root namespace
And Akka will find the akka{} section
Sample App Object
*REFS*
http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-scala.html
Supervisor Strategy
Fault Tolerance Primer
http://danielwestheide.com/blog/2013/03/20/the-neophytes-guide-to-scala-part-15-dealing-with-failure-in-actor-systems.html
routers-in-akka-with-parameterized-actors
Router Choices
Things change with versions. This is relevant to
Akka 2.4
SBT 0.13.12
Scala 2.10.6
One decision is what to configure in code and what to configure in the application.conf file.
Do you have a muti-actorsystem build process? If so you need each actor system to have a namespace in the config.
logging-system {akka{}...}
ai-system{akka{}...}
/** An instance of the entire Config object loaded
* with the application.conf data * @see src/main/resources/application.conf */ val config = ConfigFactory.load() /** it is important to get a specifc system config namespace
* if you have a multi akka system project * for our single system setup we could have akka as a root node in the config*/ val akkaConfig = config.getConfig("ai-system").withFallback(config) /** An akka actor system setup for the ai system application */ val actorSystem = ActorSystem(name = "ai-system", config = akkaConfig)
If you have a single system there is no need for the root namespace
akka{}...
And Akka will find the akka{} section
/** An instance of the entire Config object loaded
* with the application.conf data * @see src/main/resources/application.conf */ val config = ConfigFactory.load()/ ** An akka actor system setup for the ai system application */ val actorSystem = ActorSystem(name = "ai-system", config = config)
Sample App Object
Sample Configpackage com.company.aisystem import akka.actor.SupervisorStrategy.{Decider, Escalate, Restart, Resume, Stop} import akka.actor.{ActorInitializationException, ActorKilledException, ActorSystem, Inbox, OneForOneStrategy, Props, SupervisorStrategy} import akka.routing._ import com.company.aisystem.actors.{OrchestratorActor, PersistorActor, ValidationActor} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /** Acts as the "main" class for the application to start * * It extends the scala: "App" trait */ object aiSystemApp extends App { /** An instance of the entire Config object loaded with the application.conf data * @see src/main/resources/application.conf */ val config = ConfigFactory.load() /** it is important to get a specifc system config namespace if you have a multi akka system project * for our single system setup we could have akka as a root node in the config*/ val aisystemAkkaConfig = config.getConfig("aisystem").withFallback(config) /** An akka actor system setup for the ai system application */ val actorSystem = ActorSystem(name = "aisystem", config = aisystemAkkaConfig) /** A one for one supervisor strategy to be used in the router */ val superviseOneForOne = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 60.seconds){ case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate } /** A default akka inbox for the ai system actor system. not sure we need to declare this */ val inbox = Inbox.create(actorSystem) /** An akka actor ref of the persistor actor */ val persistorActor = actorSystem.actorOf(Props[PersistorActor] .withDispatcher("aisystem.akka.fork-join-dispatcher"), config.getString("aisystem.persistor-actor-name")) /** An akka actor ref of the validation actor * ///TODO why are we passing in another actor ref as the whole props? val validationActor = actorSystem.actorOf(ValidationActor .props(persistorActor) .withDispatcher("aisystem.akka.ai-thread-pool-dispatcher"), config.getString("aisystem.validation-actor-name")) final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) } /** An akka actor ref of the orchestrator actor */ //todo: setup resizer = Option[DefaultOptimalSizeExploringResizer] val orchestratorActor = actorSystem.actorOf(OrchestratorActor .props(validationActor) .withRouter(SmallestMailboxPool( nrOfInstances = 5, supervisorStrategy = superviseOneForOne, routerDispatcher = "aisystem.akka.ai-thread-pool-dispatcher", usePoolDispatcher = false)), config.getString("aisystem.orchestrator-actor-name")) ...other app init code
aisystem { queueBroker{} database{} akka { # expose actor system level debug logging for local loglevel = DEBUG //options include DEBUG and off # stdout-loglevel is only in effect during system startup and shutdown stdout-loglevel = DEBUG #options include DEBUG and off log-config-on-start = on actor { debug { # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.) autoreceive = on # enable DEBUG logging of subscription changes on the eventStream event-stream = on # enable DEBUG logging of all LoggingFSMs for events, transitions and timers fsm = on # enable DEBUG logging of actor lifecycle changes lifecycle = on # enable DEBUG logging of message receive events receive = on # enable DEBUG logging of unhandled messages unhandled = on } # Timeout for actors timeout = 30s # Timeout for ActorSystem.actorOf creation-timeout = 30s serialize-messages = off #generally only enable for testing with serialzers } fork-join-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100 throughput-deadline-time = 10ms supervisorStrategy = "Restart" } ai-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "thread-pool-executor" # Configuration for the thread pool thread-pool-executor { # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 2.0 # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 1 supervisorStrategy = "Restart" } } akka.actor.deployment { //TODO move this from App object to deployment Orchestrator { router = smallest-mailbox-pool nr-of-instances = 5 dispatcher = aisystem.akka.aii-dispatcher } Persistor { router = smallest-mailbox-pool nr-of-instances = 5 dispatcher = aisystem.akka.ai-dispatcher } Validator { router = smallest-mailbox-pool nr-of-instances = 5 } }
*REFS*
http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-scala.html
Supervisor Strategy
Fault Tolerance Primer
http://danielwestheide.com/blog/2013/03/20/the-neophytes-guide-to-scala-part-15-dealing-with-failure-in-actor-systems.html
routers-in-akka-with-parameterized-actors
Router Choices
No comments:
Post a Comment