Thursday, November 10, 2016

slick

For a durable elastic scaling system Akka is hard to beat. However some of the advanced configuration is not easy to find end to end examples of.

Things change with versions. This is relevant to
Akka 2.4
SBT 0.13.12
Scala 2.10.6

One decision is what to configure in code and what to configure in the application.conf file.

Do you have a muti-actorsystem build process? If so you need each actor system to have a namespace in the config.

logging-system {akka{}...}
ai-system{akka{}...}

/** An instance of the entire Config object loaded 
  * with the application.conf data  * @see src/main/resources/application.conf */
val config = ConfigFactory.load()
/** it is important to get a specifc system config namespace
  * if you have a multi akka system project  
* for our single system setup we could have akka as a root node in the config*/
val akkaConfig = config.getConfig("ai-system").withFallback(config)

/** An akka actor system setup for the ai system application */
val actorSystem = ActorSystem(name = "ai-system", config = akkaConfig)

If you have a single system there is no need for the root namespace
akka{}...

And Akka will find the akka{} section
/** An instance of the entire Config object loaded 
  * with the application.conf data  * @see src/main/resources/application.conf */
val config = ConfigFactory.load()/
** An akka actor system setup for the ai system application */
val actorSystem = ActorSystem(name = "ai-system", config = config)

Sample App Object
package com.company.aisystem

import akka.actor.SupervisorStrategy.{Decider, Escalate, Restart, Resume, Stop}
import akka.actor.{ActorInitializationException, ActorKilledException, 
ActorSystem, Inbox, OneForOneStrategy, Props, SupervisorStrategy}
import akka.routing._
import com.company.aisystem.actors.{OrchestratorActor, PersistorActor, 
ValidationActor}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

/** Acts as the "main" class for the application to start  *  
* It extends the scala: "App" trait  */
object aiSystemApp extends App {
  /** An instance of the entire Config object loaded with the application.conf data    
* @see src/main/resources/application.conf */  
val config = ConfigFactory.load()
  /** it is important to get a specifc system config namespace 
if you have a multi akka system project
    * for our single system setup we could have akka as a root node in the config*/ 
 val aisystemAkkaConfig = config.getConfig("aisystem").withFallback(config)

  /** An akka actor system setup for the ai system application */ 
 val actorSystem = ActorSystem(name = "aisystem", 
config = aisystemAkkaConfig)

  /** A one for one supervisor strategy to be used in the router */ 
 val superviseOneForOne =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 60.seconds){
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  /** A default akka inbox for the ai system actor system. not sure 
we need to declare this */  
val inbox = Inbox.create(actorSystem)

  /** An akka actor ref of the persistor actor */  val persistorActor = 
actorSystem.actorOf(Props[PersistorActor]
.withDispatcher("aisystem.akka.fork-join-dispatcher"), 
config.getString("aisystem.persistor-actor-name"))

  /** An akka actor ref of the validation actor *
///TODO why are we passing in another actor ref as the whole props?  
val validationActor = actorSystem.actorOf(ValidationActor
.props(persistorActor)
.withDispatcher("aisystem.akka.ai-thread-pool-dispatcher"), 
config.getString("aisystem.validation-actor-name"))

  final val defaultStrategy: SupervisorStrategy = {
    def defaultDecider: Decider = {
      case _: ActorInitializationException ⇒ Stop
      case _: ActorKilledException         ⇒ Stop
      case _: Exception                    ⇒ Restart
    }
    OneForOneStrategy()(defaultDecider)
  }

  /** An akka actor ref of the orchestrator actor */   
 //todo: setup resizer = Option[DefaultOptimalSizeExploringResizer]  
val orchestratorActor = actorSystem.actorOf(OrchestratorActor
      .props(validationActor)
      .withRouter(SmallestMailboxPool(
        nrOfInstances = 5,        supervisorStrategy = superviseOneForOne,      
  routerDispatcher = "aisystem.akka.ai-thread-pool-dispatcher",        
usePoolDispatcher = false)),     
 config.getString("aisystem.orchestrator-actor-name"))

  ...other app init code
Sample Config
aisystem {

queueBroker{}

database{}
akka {
    # expose actor system level debug logging for local    
loglevel = DEBUG //options include DEBUG and off    
# stdout-loglevel is only in effect during system startup and shutdown   
 stdout-loglevel = DEBUG 
#options include DEBUG and off  
  log-config-on-start = on
    actor {
      debug {
        # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.)  
        autoreceive = on
        # enable DEBUG logging of subscription changes on the eventStream
        event-stream = on
        # enable DEBUG logging of all LoggingFSMs for events, transitions and timers  
        fsm = on
        # enable DEBUG logging of actor lifecycle changes
        lifecycle = on
        # enable DEBUG logging of message receive events
        receive = on
        # enable DEBUG logging of unhandled messages
        unhandled = on
      }
      # Timeout for actors
      timeout = 30s
      # Timeout for ActorSystem.actorOf
      creation-timeout = 30s
      serialize-messages = off #generally only enable for testing with serialzers  
  }
    fork-join-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "fork-join-executor"  
    # Configuration for the fork join pool      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 2 
       # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 2.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 10      }
      # Throughput defines the maximum number of messages to be 
     # processed per actor before the thread jumps to the next actor. 
     # Set to 1 for as fair as possible.
      throughput = 100
      throughput-deadline-time = 10ms
      supervisorStrategy = "Restart"    }
    ai-thread-pool-dispatcher {
      # Dispatcher is the name of the event-based dispatcher 
     type = Dispatcher
      # What kind of ExecutionService to use
      executor = "thread-pool-executor"
      # Configuration for the thread pool
      thread-pool-executor {
        # minimum number of threads to cap factor-based core number to 
       core-pool-size-min = 2 
       # No of core threads ... ceil(available processors * factor)
        core-pool-size-factor = 2.0 
       # maximum number of threads to cap factor-based number to
        core-pool-size-max = 10      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor. 
     # Set to 1 for as fair as possible. 
     throughput = 1      supervisorStrategy = "Restart"    }
  }
  akka.actor.deployment {
    //TODO move this from App object to deployment    Orchestrator {
      router = smallest-mailbox-pool
      nr-of-instances = 5      dispatcher = aisystem.akka.aii-dispatcher
    }
    Persistor {
      router = smallest-mailbox-pool
      nr-of-instances = 5      dispatcher = aisystem.akka.ai-dispatcher
    }
    Validator {
      router = smallest-mailbox-pool
      nr-of-instances = 5    }
}

*REFS*
http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-scala.html
Supervisor Strategy
Fault Tolerance Primer
http://danielwestheide.com/blog/2013/03/20/the-neophytes-guide-to-scala-part-15-dealing-with-failure-in-actor-systems.html
routers-in-akka-with-parameterized-actors
Router Choices

No comments:

Post a Comment