Akka actors

akka actors

I'm looking at using Akka's new typed channels as a way to maintain the actor model in my project without the pain of lots of explicit casts where you really do need to wait on a response.

I've had great luck creating typed channels and integrating them, but I'm so far not seeing any easy solutions for creating pools of these actors for parallel execution, which is easy to do with regular untyped actors. Do I need to build my own router system?

E.g., for untyped actors I can do something like this, and I magically get four actors.

For typed channels, I have to do something like this:

Obviously I could write a second typed channel that creates a pool of actors and rotates between them, but that seems like something that someone would have done before!

As noted in the comments, Akka has dropped support for typed channels and Patrik Nordwall suggested the use of untyped actors. One way to bridge things on the reply side is with the ask pattern, the Future.mapTo method, and pattern matching.

For example, suppose you have Some class hierarchy rooted in Base with subclasses like OakLeaf , ShinyLeaf and Needle . From outside an actor, you can then do things like this:

Tutorial: Asynchronous Programming With Akka Actors

Akka actors

Typesafe’s Jamie Allen explains how useful anonymous actors are to capture context while processing asynchronous tasks

From November’s JAX Magazine, Typesafe’s Jamie Allen explains how useful anonymous actors are to capture context while processing asynchronous tasks in Akka.

One of the most difficult tasks in asynchronous programming is trying to capture context so that the state of the world at the time the task was started can be accurately represented at the time the task finishes.

However, creating anonymous instances of Akka actors is a very simple and lightweight solution for capturing the context at the time the message was handle, to be utilized when the tasks are successfully completed.

A great example is an actor which is sequentially handling messages in its mailbox but performing the tasks based on those message off-thread with Futures. This is a great way to design your actors in that they will not block waiting for responses, allowing them to handle more messages concurrently and increase your application’s performance. However, the state of the actor will likely change with every message.

Let’s take an example of an actor which will act as a proxy to get a customer account information for a financial services firm from multiple data sources. Further, let’s assume that each of the subsystem proxies for savings, checking and money market account balances will optionally return a list of accounts and their balances of that kind for this customer. Let’s write some basic Akka actor code to perform this task:

This code is fairly concise. The AccountBalanceRetriever actor receives a message to get account balances for a customer, and then it fires off three futures in parallel. The first will get the customer’s savings account balance, the second will get the checking account balance and the third will get a money market balance. Doing these tasks in parallel allows us to avoid the expensive cost of performing the retrievals sequentially. Also, note that while the futures will return Options of some account balances by account ID.  If they return None, they will not short-circuit the for comprehension. If None is returned from futSavings, it will still continue the for comprehension.

However, there are a couple of things about it that are not ideal. First of all, it is using futures to ask other actors, for responses, and creating a new PromiseActorRef for every message sent, which is a waste of resources. It would be better to have our AccountBalanceRetriever actor send messages out in a “fire and forget” fashion and collect results asynchronously into *one* actor. .

Furthermore, there is a glaring race condition in this code – can you see it? We’re referencing the “sender” in our map operation on the result from futBalances, which may not be the same ActorRef when the future completes, because the AccountBalanceRetriever ActorRef may now be handling another message from a different sender at that point!

Let’s focus on eliminating the need to ask for responses in our actor first. We can send the messages with the “!” and collect responses into a List of an optional List of balances by account number. But how would we go about doing that?

This is better, but still leaves a lot to be desired. First of all, we’ve created our collection of balances we’ve received back at the instance level, which means we can’t differentiate the aggregation of responses to a single request to get account balances. Worse still, we can’t time out a request back to our original requestor. Finally, while we’ve captured the original sender as an instance variable that may or may not have a value (since there is no originalSender when the AccountBalanceRetriever starts up), we have no way of being sure that the originalSender is who we want it to be when we want to send data back!

The problem is that we’re attempting to take the result of the off-thread operations of retrieving data from multiple sources and return it to whomever sent us the message in the first place. However, the actor will likely have moved on to handling additional messages in its mailbox by the time these futures complete, and the state represented in the AccountBalanceRetriever actor for “sender” at that time could be a completely different actor instance. So how do we get around this?

The trick is to create an anonymous inner actor for each GetCustomerInfo message that is being handled. In doing so, you can capture the state you need to have available when the futures are fulfilled. Let’s see how:

This is much better. We’ve captured the state of each receive and only send it back to the originalSender when all three have values. But there are still two issues here.

First, we haven’t defined how we can time out on the original request for all of the balances back to whomever requested them. Secondly, our originalSender is still getting a wrong value – the “sender” from which it is assigned is actually the sender value of the anonymous inner actor, not the one that sent the original GetCustomerAccountBalances message!

We can use a Promise to handle our need to timeout the original request, by allowing another task to compete for the right to complete the operation with a timeout (see listing below). Promise in Scala holds the expected return value as an Either, typed as Throwable if something goes wrong and as a generic type of whatever successful response you expected. In this case, we want an AccountBalances instance, so our Promise will be typed as Promise[AccountBalance].

Now we can collect our results and check to see if we successfully completed the promise. If so, we can return the appropriate value or the TimeoutException instance. Finally, we must remember to stop our anonymous inner actor so that we do not leak memory for every GetCustomerAccountBalances message we receive!

Asynchronous programming is simply not easy, even with powerful tools at our disposal. We always must think about the state we need and the context from which we get it. I hope this article has shown you some nice ideas about how to use Actors, Futures and Promises to perform complex tasks, as well as patterns for using them that will help you in your everyday coding. All source code can be found in my Github repository.

Author Bio: Jamie Allen has over 18 years of experience delivering enterprise solutions across myriad industries, platforms, environments and languages. He has been developing enterprise applications with Scala since 2009, primarily using actors for fault tolerance and managing concurrency at scale. Jamie currently works for Typesafe, where he helps users develop actor-based systems using the Akka framework and Scala.

This tutorial originally appeared in JAX Magazine: JavaFX Revitalised. For more of that issue and others, click here.

Akka actors

We have IgniteDataStreamer which is used to load data into Ignite under high load. It was previously named IgniteDataLoader, see ticket IGNITE-394 .

See Akka for more information. Given that Akka is a Scala frameworks, this streamer should be available in Scala.

We should create IgniteAkkaStreamer which will consume messages from Akka Actors and stream them into Ignite caches.

More details to follow, but to the least we should be able to:

  • Convert data from Akka to Ignite using an optional pluggable converter. If not provided, then we should have some default mechanism.
  • Specify the cache name for the Ignite cache to load data into.
  • Specify other flags available on IgniteDataStreamer class.

Akka actors GitHub Pull Request #1019

Akka actors GitHub Pull Request #1998

Dmitriy Setrakyan I can't assign this to myself. Not sure why, so would appreciate it if you could assign it to me. Thanks.

Joshua, I have added you to the contributors list. You now are able to pick any Ignite ticket and start working on it.

Dmitriy Setrakyan, what do you think about using akka-streams here? I've been looking at it the last couple of days and think it could work. Essentially, this would be an akka-streams Sink implementation that writes to Ignite. There are a few issues to work through, but at a high level:

&#&1;akka actor&#&3; --> akka stream --> &#&1;akka stream flow (conversion)&#&3; --> akka stream --> &#&1;ignite sink&#&3;

The flow(conversion) would be responsible for key extraction and conversion to and entry<K,V9gt;, so this becomes ultimately pluggable. Using a merge flow, multiple conversions can feed into a single ignite sink.

If this sounds like a reasonable approach, I can expand on it and throw together a quick diagram that will be far better than my ascii art.

Quick questions: Should the actual writing to Ignite leverage/extend the DataStreamerImpl class?

Where is the appropriate place to create the module?

As far as the proposed sequence, I would need some clarification. It just seems to me that this sequence would work faster than the one you are suggesting:

I also think that we may need this as well

Perhaps we should support both?

I don't think you need to extend DataStreamerImpl class. I think using public Ignite API and delegating from your Akka code directly to IgniteDataStreamer is a better approach, e.g.

Let me know what you think.

OK, I'm catching on now.

I think our best bet is an akka extension. The extension could mixin addData and removeData methods to akka actors something like this:

We could also supply and actor that is a stream consumer to bridge between streams and the extension. This might be abstract in that it would need a "key extractor" built for the domain.

I would suggest that we not use the work Streaming in the name, however, since that means something different in akka. Simply IngiteData or IgniteDataSupport might be better.

How does that sound?

I actually like the design. I do believe that we will have to have the "IgniteKeyExtractor" abstraction to get keys out of Akka messages or streams.

I think we can split this work into 2 parts:

  1. Akka Actor integration
  2. Akka Stream integration

The code snippet that you provided for Akka extension with Scala trait looks nice, so my suggestion would be to complete that first before moving to Akka streams.

What do you think?

I actually like the design too. Akka Streams integration is spot on.

Great idea. Has any work been done on this already?

Hi All, any updates for that topic , as we are working into a new project which is using akka for our order manager and we will validate the possibility of using Ignite as the data and service grid behind , and i am wondering is it possible to replace akka cluster manager with ignite one or at least if there is data grid integration via streams , i can can contribute the needed work as well if you want.

I have started working on it. I will update once it done.

GitHub user chandresh-pancholi opened a pull request:

IGNITE-532 - Implement IgniteAkkaStreamer to stream data from Akka actors

You can merge this pull request into a Git repository by running:

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch

with (at least) the following in the commit message:

This closes #1019

Author: chandresh-pancholi <[email protected];

IGNITE-532 - Implement IgniteAkkaStreamer to stream data from Akka actors

I start to review PR, will provide comments in next few days.

Thank you Semen.

(It seems Semen has started the review, but since I had a look at your code, I will write my comments too)

Putting aside minor issues, in your implementation what the actor is doing is just creating a new streamer with data on invocation. To process it, you use `AkkaStreamer` that has no relation to the created actor. I would recommend to revise the code keeping in mind how it will be used the the user.

As I see it, each actor invocation has to inject data into Ignite (as in earlier Dmitriy's and Joshua's discussion).

It looks like you just copy/pasted streamer code and test from kafka module without understanding how it works. For example, executor threads in your AkkaStreamer in infinite loop on each iteration puts in cache the same data from messageMap. Your AkkaActor just calls AkkaStreamer custructor, but it does not start streamer, so actually this is no-op.

I do not see how current implementation can be useful. Could you please explain how akka users can use it? Probaly it would be better if you send email on dev list, so more people can provide comments.

Semen Boikov I will make necessary change and update the PR.

I'm not sure it makes sense to fix something in current implementation. It would be better if you first explain what are you trying to implement and how end users will use it.

While I'm not completely up-to-date what this module is aimed to achieve, I noticed it seems like integrating with Kafka and streaming, so presumably using Akka Streams.

So I just wanted to make sure you're aware of our official Kafka integration: https://github.com/akka/reactive-kafka - feel free to use it if it fits your needs, it just has hit a stable version last week.

Hope this helps.

I have some questions about the issue:

1. Akka-stream workflow implementation few stages:

Will it be enough to implement the creation of a Sink object in ignite-stream for workflow akka-stream?

2. Support for the akka-actor is relevant?

GitHub user dream-x opened a pull request:

IGNITE-532: Implement IgniteAkkaStreamer to stream data from Akka actors.

You can merge this pull request into a Git repository by running:

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch

with (at least) the following in the commit message:

This closes #1998

Author: Max Kozlov <[email protected]gt;

Init akka stream module.

Author: Max Kozlov <[email protected];

Add akka-actor and -stream dependency in pom.xml.

Author: Max Kozlov <[email protected];

Update akka dependency.

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Add akka-stream streamer.

Author: Max Kozlov <[email protected];

Add akka-stream streamer test.

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Delete onComplete test.

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Change version akka to 2.5.

Author: Max Kozlov <[email protected];

Ignite streamer based on Scala Actor.

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Add test akka actor streamer.

Author: Max Kozlov <[email protected];

Refactoring scala tests.

Author: Max Kozlov <[email protected];

Update akka to 2.5

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Author: Max Kozlov <[email protected];

Delete unused code.

Author: Max Kozlov <[email protected];

Remove java classes. Refactor imports.

Author: Max Kozlov <[email protected];

Seems we have no Scala expertise inside Ignite Project, so issue can't be reviewed.

So, I see 2 ways here:

1) To rewrite code using java

2) To implement this as a part of Akka project.

I'm changing status from PATCH AVAILABLE to IN_PROGRESS until decision not taken.

Like this post? Please share to your friends: