Tiny bit about AWS EMR on big data

One of the recent projects I’ve worked on involved processing billions of row stored in AWS S3 in terabyte size data. That was the biggest I’ve worked so far and, even though I don’t like the term, it broke through the big data barrier. Just handling the data with popular toolkits, such as scikit-learn or MXNet, created so many problems that it was easier to create our own solutions. But the biggest surprises came from places least expected.

The first surprise came from AWS EMR. With such amount of data, there is no other way than to use a large computation cluster and the EMR is quite easy to set up. Web UI is rather nicely explained and once you know what you want you can use CLI or SDK to do so programmatically. Cool, right? So what are the problems? There are plenty of things that simply don’t work as they should or are not mentioned that they work differently. The number the one-click install applications that the EMR supports is rather big and you can see some explanation for any of them. However, nowhere is mentioned that the EMR Spark is a fork of Apache Spark and thus slightly different. It comes with different default settings so the best practices for Apache Spark aren’t the same and searching for EMR Spark just doesn’t return anything. It took me a while to find out that accessing S3 should be through s3:// or possibly through s3n:// but it’s deprecated and slow. It also states that you shouldn’t use s3a:// which is, in contrast, is the recommended way of doing with Apache Spark. Oh, and while I’m on the S3…

Another big surprise came from AWS S3 itself. Thinking how global and popular the service is I was surprised to learn that there are actual limitations on the connection. Ok, I wasn’t surprised that there are any, but I thought they were much bigger. According to AWS S3 documentation on Request Rate and Performance Considerations one shouldn’t exceed 100 PUT/LIST/DELETE or 300 GET requests per second. It is averaged over time so occasional bursts are Ok but do it too often and S3 is going to throttle you. Why this matters? When you are using Spark to save directly to S3, e.g.

    val orders = sparkContext.parallelize(1 to 1000000)
    orders.saveAsTextFile("s3://bucketName/orders")

and you are working with hundreds of executors (processes) on small tasks (batches) then you are going to query S3 a lot. Moreover, by default Spark saves output in a temporary directory and once save is done it renames and moves everything. On file system that’s almost instantaneous operation but on object storage, such as S3, this is another big read and save, and it takes a significant amount of time. Yes, this can and should be avoided by proper configuration or repartitioned to a smaller number before but learning about this the hard way is not the nicest experience. Moreover, one would expect that the integration between EMR and S3 would be smoother.

Having said all of that I need to highlight that working with Spark and Hadoop on AWS EMR is rather simple. It takes a long time to learn the nuances and proper configuration per task but once that’s done the life gets only better. One of features I’m looking forward in the future is a better integration with MXNet. Both MXNet and Tensorflow allow for nice utilization of CPU and GPU clusters so it should be a matter of time for EMR to support that out of the box, right? Hopefully. In the meantime Spark + Hadoop + Ganglia + Zeppelin seems to be all I need for big data processing pipelines.

Advertisements

AWS Polly GUI

Although learning and book knowledge are the best, my personal relationship with reading activity is not the friendliest. Being focused on the text is a huge struggle and I often need to re-read sentences to actually read it. That’s why sometimes I use text-to-speech (TTS) software or service.

Few years ago I discovered an Ivona Text-to-speech software which was far superior to any other TTS solution. It was able to quickly read out loud (and clear) text from my clipboard. Not only it was better than others but also it supported Polish – my language. Even though the default software wasn’t useful for my use cases, i.e. scientific papers have unusual formatting, it wasn’t that difficult to write a wrapper and GUI around the Ivona. Unfortunately, it’s not supported anymore and one cannot download the offline version.

Currently, Ivona is owned by the Amazon and its voices are accessible through the Polly AWS service. It’s a relatively a cheap service but one still has to have an internet connection and it’s not provided with any gui. At least officially.

I’ve written an application to use AWS Polly. It’s a simple graphical interface with some formatting options for the text but it does its job. The AWS Polly GUI is accessible from my GitHub page. It’s running on Python3 with PyQt5.

Features are updated as needed so if something might be helpful to anyone, feel free to contact me or create a ticket issue on the repository. I’m using this for my personal work so I’m not planning on leaving this on a side.

JSON pretty formatting in vim

Quick ad hoc command:

:%!python -m json.tool

For reusing and typing more update your `.vimrc` file with:

com! FormatJSON %!python -m json.tool

After this you should be able to use :FormatJSON command.

This would convert:

[{"classification":"emrfs-site", "properties":{"fs.s3.consistent.retryPeriodSeconds":"10", "fs.s3.consistent":"true", "fs.s3.consistent.retryCount":"5", "fs.s3.consistent.metadata.tableName":"EmrFSMetadata"}, "configurations":[]},{"classification":"spark", "properties":{"maximizeResourceAllocation":"true"}, "configurations":[]},{"classification":"spark-env", "properties":{}, "configurations":[{"classification":"export", "properties":{"PYSPARK_PYTHON":"/usr/bin/python3"}, "configurations":[]}]}]

into

[
    {
        "classification": "emrfs-site",
        "configurations": [],
        "properties": {
            "fs.s3.consistent": "true",
            "fs.s3.consistent.metadata.tableName": "EmrFSMetadata",
            "fs.s3.consistent.retryCount": "5",
            "fs.s3.consistent.retryPeriodSeconds": "10"
        }
    },
    {
        "classification": "spark",
        "configurations": [],
        "properties": {
            "maximizeResourceAllocation": "true"
        }
    },
    {
        "classification": "spark-env",
        "configurations": [
            {
                "classification": "export",
                "configurations": [],
                "properties": {
                    "PYSPARK_PYTHON": "/usr/bin/python3"
                }
            }
        ],
        "properties": {}
    }
]

Multiprocessing in Python – all about pickling

Chances are you heard that multiprocessing in Python is hard. That it takes time and, actually, don’t even try because there’s something like global interpreter lock (GIL), so it isn’t even true parallel execution. Well, GIL is true, but the rest is a lie. Multiprocessing in Python is rather easy.

One doesn’t have to look far to find nice introductions into processing in Python [link1, link2]. These are great and I do recommend reading on them. Even first Google result page should return some comprehensible tutorials. However, what I was missing from these tutorials is some information about handling processing within class.

Multiprocessing in Python is flexible. You can either define Processes and orchestrate them as you wishes, or use one of excellent methods herding Pool of processes. By default Pool assumes number of processes to be equal to number of CPU cores, but you can change it by passing processes parameter. Main methods included in Pool are apply and map, which let you run process with arbitrary arguments or execute parallel map, respectively. There are also asynchronous versions of these, i.e. apply_asyncand map_async.

Quick example:

from multiprocessing import Pool
def power(x, n=10):
    return x**n

pool = Pool()
pow10 = pool.map(power, range(10,20))
print(pow10)
[10000000000, 25937424601, 61917364224, 137858491849, 289254654976, 576650390625, 1099511627776, 2015993900449, 3570467226624, 6131066257801]

Simple, right? Yes, this is all what’s needed. Now, go and use multiprocessing!

Actually, before you leave to follow your dreams there’s a small caveat to this. When executing processes Python first pickles these methods. This create a bottleneck as only objects that are pickle will be passed to processes. Moreover, Pool doesn’t allow to parallelize objects that refer to the instance of pool which runs them. It sounds convoluted so let me exemplify this:

from multiprocessing import Pool

class BigPow:
    def __init__(self, n=10):
        self.pool = Pool()

        self.n = n

    def pow(self, x):
        return x**self.n

    def run(self, args):
        #pows = self.pool.map(ext_pow, args)
        pows = self.pool.map(self.pow, args)
        return sum(pows)

def ext_pow(x):
    return x**10

if __name__ == "__main__":
    big_pow = BigPow(n=10)
    pow_sum = big_pow.run(range(20))
    print(pow_sum)

Code above doesn’t work, unless we replace self.pow with ext_pow. This is because self contains pool instance. We can remove that through removing pool just before pickling through __getstate__ (there’s complimentary function __setstate__ to process after depickling).

from multiprocessing import Pool

class BigPow:
    def __init__(self, n=10):
        self.pool = Pool()

        self.n = n

    def pow(self, x):
        return x**self.n

    def run(self, args):
        pows = self.pool.map(self.pow, args)
        return sum(pows)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

if __name__ == "__main__":
    big_pow = BigPow(n=10)
    pow_sum = big_pow.run(range(20))
    print(pow_sum)

This is good, but sometimes you’ll get an error stating something like “PicklingError: Can’t pickle : attribute lookup __builtin__.instancemethod failed”. In such case you have to update registry for pickle on what to actually goes into pickling.

from multiprocessing import Pool

#######################################
import sys
import types
#Difference between Python3 and 2
if sys.version_info[0] < 3:
    import copy_reg as copyreg
else:
    import copyreg

def _pickle_method(m):
    class_self = m.im_class if m.im_self is None else m.im_self
    return getattr, (class_self, m.im_func.func_name)

copyreg.pickle(types.MethodType, _pickle_method)
#######################################

class BigPow:
    def __init__(self, n=10):
        self.pool = Pool()

        self.n = n

    def pow(self, x):
        return x**self.n

    def run(self, args):
        pows = self.pool.map(self.pow, args)
        return sum(pows)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

if __name__ == "__main__":
    big_pow = BigPow(n=10)
    pow_sum = big_pow.run(range(20))
    print(pow_sum)

Yes, this is all because of Pickle. What can you do with it? In a sense, not much, as it’s the default battery-included solution. On the other, pickle is generally slow and now community standard seems to be dill. It would be nice if something was using dill instead of pickle. Right? Well, we are in luck because pathos does exactly that. It has similar interface to multiprocessing is it’s also easy to use. Personally I’m Ok with multiprocessing, as I like not to import too much, but this is a good sign that there are developments towards more flexible solutions.

Complete Ensemble EMD with Adaptive Noise (CEEMDAN) in Python

CEEMDAN is available in Python through PyEMD.

There are as many empirical mode decomposition (EMD) variations as many teams are working on it. Everyone notices that in general EMD is very helpful method, yet, there’s room for improvement. Few years back I have stopped doing modifications myself in exchange for working on mathematically sound model of coupled oscillator. Nevertheless, since I spent quite a lot of time on EMDs and have enjoy playing with it, from time to time something will catch my eye. This is what happened with Complete Ensemble EMD with Adaptive Noise (CEEMDAN).

As name suggests this is an expansion on the ensemble EMD, which was already covered. What’s the difference? In case of CEEMDAN we’re also decomposing our perturbation to the system, i.e. added noise. Method creates an ensemble of many perturbations, decomposes them using EMD and resulting IMFs are included to evaluate components of the input. I will refer to these components as cIMF. The actual algorithm was first proposed by Torres et. al [1], but shortly after an improvement in efficiency was proposed[2]. These updates refer mainly to noticing that for their purpose one doesn’t need to compute all IMFs and weight parameter can be progressively scaled as well. What exactly is this algorithm?

Let’s define operator IMFi(S) which returns ith IMF (pure EMD) of its input S and M(S) to provide local mean, i.e. M(S) = S – IMF1(S), then the algorithm is as follows:

  1. Create Gaussian noise ensemble W={wi}, where i\in[1..N], and decompose them using EMD.
  2. For input signal S calculate grand average of local means from signal perturbed by scaled noise first IMF
    R_{1} = \frac{1}{N}\sum_{1}^{N} M(S+ \beta_0 IMF_{1}(w^{i})).
  3. Assign first cIMF to be: C1 = S – R1.
  4. Compute R_{k}= \frac{1}{N} \sum_{i=1}^{N} M(R_{k-1} + \beta_{k-1} IMF_{k}(w^{i})).
  5. Calculate kth cIMF as Ck = Rk-1 – Rk.
  6. Iterate 4. and 5. until set of {S, Ck} fulfils EMD stopping criteria.

As it can be seen a family of parameters β has been included in the algorithm. These scalars refer to the amount of decomposed noise used to compute cIMFs. This is what the authors refer to as noise adaptive. These parameters are arbitrary, but it’s suggested in improved version [2] to set them as \beta_{k}= \epsilon_{0} \sigma(R_k), where σ is standard divination of argument and ε is another arbitrary parameter. Looking at point 4. one can see that for ith residue we are using ith IMF computed from noise. This is a problem, because EMD decomposes signal into a finite set of components and it can happen that there isn’t ithIMF. In this case authors are suggesting to assume component to be equal 0.

Advantage of this variation comes from the fact that created decomposition {Ci} fully reconstructs input. This is in contrast to EEMD which doesn’t guarantee such completeness. However, with CEEMDAN questions rise regarding the meaning of added scaled IMFs of noise. Augmenting signal with ensemble of pure noise creates perturbations of input without any distinguished direction. As it has been observed by Flandrin et al. [3] when decomposing white noise EMD acts as a dyadic filter bank. This means that extracted IMFs will have preferred structure and adding them to input will be similar to adding vector with random length but particular direction.

Regardless of all, CEEMDAN is definitely an interesting method. Just purely by the number of citations it seems that I’m not the only one thinking that. I’ve included it to my Python PyEMD package, so feel free to play with it and leave some feedback.

References

[1] Torres ME, Colominas MA, Schlotthauer G, Flandrin P. A complete ensemble empirical mode decomposition with adaptive noise. InAcoustics, speech and signal processing (ICASSP), 2011 IEEE international conference on 2011 May 22 (pp. 4144-4147). IEEE.
[2] Colominas MA, Schlotthauer G, Torres ME. Improved complete ensemble EMD: A suitable tool for biomedical signal processing. Biomedical Signal Processing and Control.
[3] Flandrin P, Rilling G, Goncalves P. Empirical mode decomposition as a filter bank. IEEE signal processing letters.

Google wants back my microphone

My “writing” work currently goes somewhere else and have little motivation to write anything here. But, there’s something that only internet can help, whether that’s through actual help or simply transferring my annoyance.

In the past few days/weeks there has been some uproar about Facebook listening to us and later subtly suggesting products about which we talked with others. With these it’s hard to point who is objective, so I’ll paste link to web searches and I’m sure you’ll find some “evidence” – Google, Bing and DuckDuckGO. Let me also suggest Reply All podcast who recently had episode on this mysteriously called Is Facebook Spying on You?. Obviously Facebook denies all of this, but they confirm having lots of information about you whether that’s from you directly or from your friends.

Facebook and I are not in good terms for a long time. It’s more a fun social experiment rather than actual social platform. Since it isn’t on my phone there’s nothing to complain about, but there’s another omnipresent God – Google. Actually I have one of its branded phone with turned on Google Assistant, so it had to be there and had to listen to me.

Long story short, I removed microphone permissions from all Google services. Obviously some weren’t happy with this, but I can’t see how this should affect their usage. Except for Google Assistant or occasional input features, nothing should care, right? No. This is really tough break up as from time to time I’m getting vocal suggestions that are close to being commands. Google calls me to when it’s safe you’ll first need to use your phone’s screen and tap the notification then you can let the Google App access some things on your device. This is especially annoying when I’m listening to podcasts or music.

In the beginning this would go on and on, but now it’s more once a day. I don’t think that it has some “time decreasing” variable build in, so it’s definitely my action. More surprising is that even if I quickly unlock phone there won’t be anything new to give permissions to. Also, it might be only happening when the phone is locked as I haven’t had this happening otherwise.

Free AWS is good. Not awesome, but good.

Amazon with it’s Amazon Web Service (AWS) is pretty cool. It gives you access to remote machine which you don’t have to maintain. Actually you don’t have to do anything other than use it. All machines come in different flavours, but what tastes better than free? Granted that it’s extremely limited, but surely we can squeeze something out of it. Right?

AWS instances, i.e. remote machines, differ in the amount of RAM, disc space, operating system, whether they have GPU access and so on. As you can expect free tier instance is pretty low on all measure values. To be more precise free tier instance is of t2.micro type, which is a general purpose burstable instance with a single CPU, 1 GiB memory and EBS data storage (default 4Gb storage).

What is this good for? Depending on the needs, this might be good for almost anything that doesn’t require whatever these instances are lacking. (Did I help?) Obviously. So it’s not so good for heavy computations, training machine learning models or storing data. First of all, it’s better to use for these some other services like S3, DynamoDB, Lex or general machine learning. However, in case of specific requirements, it’s always better just to rent(?) more powerful instance.

These cheap instances, in my option, are very good for few tasks. The main one is web scrapping. This is tedious task that requires small CPU bandwidth, but constant access to the internet. Moreover, we don’t really want to make many calls in small time period so there needs to be a delay between each download. That’s either because we would like to avoid being detect as a bot, or for simply politeness to the owner of the server (not clogging bandwidth).

Internet is full of examples of scrappers for different type of data. I’m adding my own to the collection with r-u-listening project. The core of the project is to allow for users to find similar music to their input. It is a bit more than recommender, but more on this project probably in the future. The scraper itself is more in two parts, i.e. crawler.py and scraper.py. The database that I’m using is FreeMusicArchive.org, which goes with slogan “It’s not just free music; it’s good music”. I do recommend it and once I have something valuable I’d like to share it with them.

Unfortunately these instances don’t come with big default memory and storage. By default they have only 4 Gb storage, which when downloading mp3 tracks will be enough for about 800 tracks (assuming about 5 Mb per track). Again, as always, it depends on the task, but for machine learning algorithms we go with The more, the merrier.

As mentioned before, free tier instances allow up to 32 Gb. To do so go to EC2 service in your AWS console. In the options tab (left side) find Elastic Block Store (EBS) and select Volumes. Then select your instance and Actions, and Modify Volume. Simple, right? In all honesty, like many things in the AWS.

I’ve been using AWS for a while. Even finished AWS general course, its essentials and 3 day onsite workshop on Architecting on AWS. All is pretty simple and consistent. I like it.