juglr
Class Actor

java.lang.Object
  extended by juglr.Actor
Direct Known Subclasses:
DelegatingActor, MulticastActor, TCPChannelActor, TCPServerActor

public abstract class Actor
extends java.lang.Object

Base class for all actors. An Actor in the Juglr framework sends and receives Messages over a MessageBus. All communications between actors are fully asynchronous.

A very important rule when coding with actors is the idea of "shared nothing". This is enforced by routing all inter-actor communuications through the message bus. It must be emphasized that actors should not have direct references to each other. Instead they simply store the Addresses of the actors they need to send messages to.

Although the standard Message class can indeed hold shared state it is strongly advised to avoid this. One way to assert that there is no shared state is to only use Box messages which can only store simple data types (and also have the added benefit of mapping cleanly to JSON).

There are two central callback methods actors can override, namely react(juglr.Message) and start(). As a rule of thumb these methods should never block in order not to starvate the underlying threadpool of the message bus. There are three legal ways for an actor to block, notably awaitMessage(), await(Callable), and awaitTimeout(long).

Parallelizing Work

Each actor is guaranteed to only be handling one message at a time. In technical terms this means that react(Message) is guaranteed to be called from a context synchronized on the actor. Juglr provides some helper classes for parallelizing work, namely DelegatingActor and MulticastActor.

See Also:
Message, MessageBus

Constructor Summary
Actor()
          Create an actor connected to the default message bus
Actor(MessageBus bus)
          Create an actor connected to the MessageBus bus
 
Method Summary
<T> T
await(java.util.concurrent.Callable<T> closure)
          Do a blocking call and return its value.
 Message awaitMessage()
          Block until a message is received and return the message.
 void awaitTimeout(long millis)
          Sleep for millis milliseconds and resume operation.
 Address getAddress()
          Get the unique address of this actor assigned by the message bus upon connection time
 MessageBus getBus()
          Get the message bus this actor is connected to
abstract  void react(Message msg)
          Primary method for handling incoming messages, override it with your message handling logic.
 void send(Message msg, Address receiver)
          Send a message to another actor.
 void start()
          Initiate the actor life cycle, you may start sending messages from within this method.
 java.lang.String toString()
          Returns the externalized form of this actor's Address
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

Actor

public Actor()
Create an actor connected to the default message bus

See Also:
MessageBus.getDefault()

Actor

public Actor(MessageBus bus)
Create an actor connected to the MessageBus bus

Parameters:
bus - the message bus the actor should connect to
Method Detail

getAddress

public final Address getAddress()
Get the unique address of this actor assigned by the message bus upon connection time

Returns:
the unique bus name for this actor

getBus

public MessageBus getBus()
Get the message bus this actor is connected to

Returns:

toString

public java.lang.String toString()
Returns the externalized form of this actor's Address

Overrides:
toString in class java.lang.Object
Returns:

send

public final void send(Message msg,
                       Address receiver)
Send a message to another actor. To send a reply to an incoming message do send(myMsg, msg.getSender()

Parameters:
msg - the message to send
receiver - the address of the actor to send to
See Also:
Message.getSender()

awaitMessage

public final Message awaitMessage()
Block until a message is received and return the message. The blocking is done in a manner where the thread pool of the message bus does not risk starvation.

This method must only be called within the react(juglr.Message) and start() methods of the actor.

In general it is most effective to not use this method and simply rely on react(juglr.Message) and some sort of state machine. However there are cases where the business logic becomes complex or where optimal performance is less important.

Returns:
the newly arrived message or null in case the actor was interrupted while waiting for a message

await

public final <T> T await(java.util.concurrent.Callable<T> closure)
              throws java.lang.reflect.InvocationTargetException,
                     java.lang.InterruptedException
Do a blocking call and return its value. This is useful for doing IO or other blocking operations. The blocking will be done in a manner such that the thread pool of the message bus will not be affected.

This method must only be called within the react(juglr.Message) and start() methods of the actor.

If you need to do a lot of blocking operations consider batching them into one call to this method. Ie. don't read 128 bit blocks from a file in sequential calls to this method, but read big chunks or even the whole file in one go.

Parameters:
closure - the callable to execute
Returns:
the return value of closure.call()
Throws:
java.lang.InterruptedException - if interrupted while processing the blocking call
java.lang.reflect.InvocationTargetException - if closure.call() throws an exception. In this case the cause of the InvocationTargetException is guaranteed to be set to the original exception from closure.call().

awaitTimeout

public void awaitTimeout(long millis)
                  throws java.lang.InterruptedException
Sleep for millis milliseconds and resume operation. The blocking is done in a cooperative manner and the thread pool of the message bus will not be starved because of threads blocking on awaitTimeout.

Parameters:
millis - number of milliseconds to sleep
Throws:
java.lang.InterruptedException - if interruped while sleeping

start

public void start()
Initiate the actor life cycle, you may start sending messages from within this method. This method is guaranteed to be run in a calling context synchronized on this actor.


react

public abstract void react(Message msg)
Primary method for handling incoming messages, override it with your message handling logic. This method is guaranteed to be run in a calling context synchronized on this actor. In effect this means that actors only handle one message at a time. For a discussion on how to parallelize message processing see the section in the class documentation.

You can await messages from withing this method by calling awaitMessage(). To prepare for handling the next message in a clean context simply return from this method call.

Blocking operations, such as IO, should be done within an await(Callable) call.

Parameters:
msg - the incoming message