Friday, August 16, 2013

mid-2013, my Spark Odyssey... when Akka sparks Yahoo! finance

This entry is the second of a series that relates some work/test I'm doing using Spark. All information can be found in the very first post of this series.

In the previous post, I've explain how easy it is to consume a Twitter stream using Spark, and seamlessly easy how to attach a sentiment score to each tweet passing a given condition (read: filtered).

In this one, I'm going to tackle another fold of my test, which is fetching the current stock information for a given list of companies.
These information will be provided by the Yahoo finance API...

...well, hum, not exactly an API. Actually, Thanks to @kennyhelsens who shown me this "hack", we'll use a Yahoo! URL that produces a CSV file containing real-time (mostly) information about companies' stock changes. Information are really hard to grasp for this URL's parameter however you can find some out there.


In this series, we're trying to uses real-time streams rather that batch processing to give a real-time fluctuation of the health of given companies.
But we've just seen that we're going to consume CSVs that give a snapshot of the stocks changes.
A common solution to transform a static API to a stream is to poll it and to create a streams by appending the results one after the other.

Here, we'll try a pretty simple solution for that, we'll delegate to Akka the task to consume the CSV, to parse it, to split data by company, and to modelize each and finally to push everything to a DStream.

It's rather straightforward and we'll see that it's even simpler that we would expect since, Spark is already playing a good game with Akka by integrating out of the box an API to "consume" actor's messages.


In this scene, there will be several actors each playing its own roles (sorry Peter Seller's fans -- like me btw).
Here they are:
  • scheduler: this one is provided by Akka out-of-the-box and is able to send a "given message" to a given actor at a given interval.
  • feeder: this actor will simply fetch the CSV and produce the data to be sent to the below actor.
  • receiver: this one will receive a message whenever a data is available for a given company.
  • parentReceiver: this one will be created by Spark itself using the StreamingContext and is able to push the received Data to the right RDD in its holding DStream (see ActorReceiver).

Now the scenario:
Sequence from the scheduler to the DStream (current RDD)
Does that make sense? Let's have a short overview on the implementation.

Scheduling the poller

First we have to create a scheduler that will poll every 500ms the server... well using Akka and the separation of concerns and Message Passing Style, we'll just create a scheduled action that passes a message to a given actor:

But even before that, let's prepare the Akka field:

There are a plenty way of creating ActorSystem with Akka, but Spark is preparing some stuffs for us to create one that will respect some of its own internal conventions. This helper is available in the `spark` package, naming AkkaUtils; however it's package protected... That's why I've created my own utils to create the ActorSystem under the spark package -- I'm bad sometime, but I'm so often lazy :-D.

Actually, the Spark utility for Akka will create an ActorSystem using a host and a port for us, that'll be really helpful afterwards... because everything in Spark must be serializable and thus we cannot so easily package an ActorRef (needs an ActorSystem, ...). So that, we'll use URL to create refs rather than using actors picked within a closure in a Spark action.
Recall: that's how Spark manage to recover from a failure, it's take to lineage of the RDD or rebuild everything up from the initial data (not possible in Streams...). But specially, the actions (functions) are dispatched to machine where data reside. So the lineage contains the functions, and a function might contain ActorRefs!
Now that we have an actorSystem available, let's create the scheduler (see last section, Drawbacks, for more info about problems with that methodology):
Nothing relevant to say about that, only maybe the definition of FeederActor...

Feeding Spark with Yahoo data

Here is coming the feeding phase, we won't see how the CSV is read and parsed however we'll see what going on with the messages flow.
What's interesting in this actor is two-folds:
  1. it can receive a message containing two data: 
    1. an actorRef that it will hold in its state: this one will be the receiver
    2. a list of stocks to follow (by building the according call to Yahoo)
  2. when consuming the data and producing the data by stock, it sends them one by one to the above actor
Actually, this feeder has two responsibilities which are fetching the data and pushing it to Spark.
But how is this receiver constructed and passed?

Receiver -- our spark stuff

The receiver is actually an actor that will be created internally by Spark for us, for that it must respect yet another trait, Receiver. And a Receiver can only be an Actor (using self definition)!
Four things to point here:
  1. this actor is well-defined by extending the Receiver trait, which provide the pushBlock method.
  2. it holds a reference to actor that it, it-self, create based on a give URL -- an actor URL... this is where AkkaUtils came handy! This Actor will be the feeder actor.
  3. in the hook preStart we tell the feeder actor that this ActorRef is the Spark end-point where data must be sent to publish in the stream.
  4. when data arrives, we check in the cache if it's already there (see below), if not we call the Receiver#pushBlock with it.
Why do we have to cache? Actually when the stock market has closed, Yahoo will continually return the last change before closing... so, here, we just avoid telling our health checker that the mood is monotonously increasing or decreasing (or even just flat if zero).

The pushBlock method is the clue to publish to the underlying DStream managed by Spark, actually it will wrap our data into an Internal representation before telling the parent actor to use it: context.parent ! Data(y).

But what is that context.parent?

parentReceiver -- not our spark stuff

This receiver is totally managed by Spark, and so I won't dig into more details than just saying it's being created underneath when doing the following:
Fairly simple, yes, it's true! Actually, the StreamingContext we created at the very start of the project contains everything to create DStream based on Akka, all it needs is a reference to a Receiver (defined in the previous section).

What is this actorStream doing is creating an ActorReceiver (with a Supervisor and blahblah) which will itself create and manage a worker for the given Props, but also it will hold the reference to the current RDD in the DStream.

The worker will be able to send him messages of the form Data(d) using pushBlock, and which it'll push to the DStream.

Concise huh? If only the document was telling us that, I wouldn't have to dig that deeper in the code... the good side-effect is that now I understand it pretty well. 
So, I hope you too!

Where are we?

Up to here, we've created a DStream of data for each tweet paired with 'its' company, but also we have now some info about what the stock change of them.
What will happen in the next blog is that we'll create a multiplexed stream for which we'll compute a value for every window of 60'.

Drawbacks of the/my methodology

At this level, I can see two problems.

The scheduler

Using a scheduler is bad because we cannot ensure that each message are processed sequentially by Spark. So it would say that we push data to an RDD that correspond to the time where the whole CSV has been processed but also the messages sent by the actors!

I put this aside for the moment since I'm not building a real-life project (no-one will use this tool to take decision on the marker ^^).
Moreover, mathematically, let say at the limit, the value will be correct: reducing an amount by 2 at t⁽¹⁾ or at t⁽²⁾ is not really problematic, it's like inverting two ints in a list that we're summing over.

The consumer

I didn't shown its current implementation because its very simple and bad, that is I'm simply opening a connection on the URL, consuming everything until the end before applying the process...

That says: it's blocking and non-reactive.
Also, That says that we loose the power of parallelism since the Akka message dispatcher will have to wait a CSV to be completely processed before handling the next one.

However it's just an implementation detail that can be easily worked around without modifying the solution.