AWS’ CDK for ECS Fargate, or how to run infrequent large jobs

Yep, it’s one of those titles where unless you know what it’s about you probably don’t know what it’s about. If you feel with these abbreviations, go ahead and scroll down a bit. Otherwise, let me go ahead and expand here a bit: Amazon Web Services’ Cloud Development Kit for Elastic Container Service Fargate. That’s probably not clearer so on to the descriptions we go.

Syllabus

Cloud Development Kit

This is an SDK for the Cloud Formation. It allows to write an infrastructure as if it was a code. It currently supports a couple of languages, including Python and Java, but it feels that the main one is TypeScript. The CDK compiles to native Cloud Formation (Cfn) so whatever is missing in Cfn will also be missed in CDK. Additionally, some modules are in experimental phase which means that their api isn’t fully established. I doubt whether these changes are going to be significant; most likely property naming or different default setting. However, they reserve an option to introduce breaking changes to these modules, e.g. Cognito.

Elastic Container Service (ECS)

As with most AWS services, just ignore the “Elastic” part and we’re set – Container service. It allows to run containers, mainly Docker, away from your machine. It has some functionality to enable Docker Swarm or Kubernetes-like orchestrating and means to provision resources when needed. Actually, currently there are two types of resource provisioning – self- and auto-managed. The self-managed solution is simply called “EC2” as it requires you to provide EC2 or auto-scaling group where the ECS can install its framework and per need utilize required volume. The auto-managed option is called…

Fargate

Treat this like a heavy AWS Lambda and you won’t be too far off. The difference is that the Lambda is often used to run just a code and sometimes the whole runtime provided in a single container. With the ECS you have to provide at least one container which are bundled in a group and renamed to a Task Definition. The Fargate service allows you to forget about everything except mentioned task definitions. They’ll do the provisioning and scaling for you (not for free) but you need to specify metrics based on which you want the scaling in and out.

How to run infrequent large jobs?

A couple of times there have been a situation when occasionally I need to run a large job/script. By a large I mean that its execution on my laptop takes about 10-60 min. This needs to run every week for 100 of different configuration. A use case is retraining a prediction model with the latest weekly report. All in all I need to have a medium computational job that will burst once a week. As with any problem there are many potential solutions. Before stating what’s my preferred design pattern let’s strike out a couple of candidates.

Amazon Lambdas. These would be awesome if they didn’t have a timeout. Unfortunately, access to their process is being shut down <a href="https://docs.aws.amazon.com/lambda/latest/dg/limits.html"after 15 min and, besides, their memory is up to 3Gb which sometimes might be to little. Smart people might suggest dividing the logic into finer granularity, to which I’d say that they’re not that smart and don’t try to fix everything with a hammer.

Why not just have one host instance and run all these jobs one after another? Well, why not just pass exam by changing the question and answering your own? No, I want them all done within an hour since getting the result so I can plan the following week accordingly.

Ok, maybe have a periodic function like a cron job or CloudWatch event and run a lambda function that provisions EC2 hosts, and… ? This quickly becomes dependency hell. You need to provision host, then run there something, deprovision… it quickly changes into a Step Function workflow and you need to maintain code for the infrastructure and its logic. Way too much hassle.

My preferred solution is ECS. Containers have this nice property that once you try them, you like them and you stay with them. What works for me is to have all the logic in a container with specific entrypoint (simple dockerfile example below) and wrapped it into a Task Defintiion that provides arguments (command) to the container. The number of running tasks depends on an SQS size; if it has more than 0 messages then keep on adding tasks. These messages can have additional parameters that the logic knows how to extra. Done. That’s it. The autos calling property will take care that for the majority of time there are 0 containers and as soon as one start sending messages it will increase the number of containers.

How does the CDK come to play here? They provide a solution to do just that with only a few lines of code. CDK has a module called ECS patterns which provides recipes for the common ECS patterns like Application/Network Load Balanced clusters or periodic scheduled jobs. The one that I talked about is called Queue Processing Fargate Service (there’s also EC2 version). Excluding alarms, the whole infrastructure for mentioned services takes about 5 lines (basic example below). There are of course additional parts dependent on your service but the infrequent scaling bit is done. Cool, right?

Example of ECS’s CDK TypeScript

const queue: sqs.Queue = new sqs.Queue(this, 'ResourceQueue', 'MySqsQueue');
const image = ecs.ContainerImage.fromEcrRepository(ecr.Repository.fromRepositoryName(this, 'ResourceName', 'container');

const scalingSteps: Array = [ {change: -1, upper: 0}, {change: 1, lower: 0}, ];
const command: Array = ["--sqsUrl", queue.queueUrl];

const ecsService = new ecs_patterns.QueueProcessingFargateService(this, "FargateService", {
image, command scalingSteps, queue
});

Typical docker for Python jobs


FROM python:3.7-slim

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENTRYPOINT [ "python", "main.py" ]

Workspace with the tmux

Most of my projects now are separated from the beginning into logical entities and put into docker. This makes the deployment and replacement much easier to manage but the development many rituals comands.

Tmux to the rescue. Being a terminal/cmdline guy I like to have all my logs and dashboards in one view. Code below is used to for Flask Python backend, React Javascript frontend and MongoDb.

#!/bin/sh
cd ~/project
sudo docker run -d -p 27017:27017 -v ~/project/db/mongo-data:/data/db mongo

tmux new-session -d -s Development -n server -c ~/project/server "./venv/bin/python server.py"
tmux split-window -v -c ~/project/client "yarn start"
tmux split-window -h -c ~/project/db "mongo"
tmux new-window 'mutt'
tmux -2 attach-session -d

Tiny bit about AWS EMR on big data

One of the recent projects I’ve worked on involved processing billions of row stored in AWS S3 in terabyte size data. That was the biggest I’ve worked so far and, even though I don’t like the term, it broke through the big data barrier. Just handling the data with popular toolkits, such as scikit-learn or MXNet, created so many problems that it was easier to create our own solutions. But the biggest surprises came from places least expected.

The first surprise came from AWS EMR. With such amount of data, there is no other way than to use a large computation cluster and the EMR is quite easy to set up. Web UI is rather nicely explained and once you know what you want you can use CLI or SDK to do so programmatically. Cool, right? So what are the problems? There are plenty of things that simply don’t work as they should or are not mentioned that they work differently. The number the one-click install applications that the EMR supports is rather big and you can see some explanation for any of them. However, nowhere is mentioned that the EMR Spark is a fork of Apache Spark and thus slightly different. It comes with different default settings so the best practices for Apache Spark aren’t the same and searching for EMR Spark just doesn’t return anything. It took me a while to find out that accessing S3 should be through s3:// or possibly through s3n:// but it’s deprecated and slow. It also states that you shouldn’t use s3a:// which is, in contrast, is the recommended way of doing with Apache Spark. Oh, and while I’m on the S3…

Another big surprise came from AWS S3 itself. Thinking how global and popular the service is I was surprised to learn that there are actual limitations on the connection. Ok, I wasn’t surprised that there are any, but I thought they were much bigger. According to AWS S3 documentation on Request Rate and Performance Considerations one shouldn’t exceed 100 PUT/LIST/DELETE or 300 GET requests per second. It is averaged over time so occasional bursts are Ok but do it too often and S3 is going to throttle you. Why this matters? When you are using Spark to save directly to S3, e.g.

    val orders = sparkContext.parallelize(1 to 1000000)
    orders.saveAsTextFile("s3://bucketName/orders")

and you are working with hundreds of executors (processes) on small tasks (batches) then you are going to query S3 a lot. Moreover, by default Spark saves output in a temporary directory and once save is done it renames and moves everything. On file system that’s almost instantaneous operation but on object storage, such as S3, this is another big read and save, and it takes a significant amount of time. Yes, this can and should be avoided by proper configuration or repartitioned to a smaller number before but learning about this the hard way is not the nicest experience. Moreover, one would expect that the integration between EMR and S3 would be smoother.

Having said all of that I need to highlight that working with Spark and Hadoop on AWS EMR is rather simple. It takes a long time to learn the nuances and proper configuration per task but once that’s done the life gets only better. One of features I’m looking forward in the future is a better integration with MXNet. Both MXNet and Tensorflow allow for nice utilization of CPU and GPU clusters so it should be a matter of time for EMR to support that out of the box, right? Hopefully. In the meantime Spark + Hadoop + Ganglia + Zeppelin seems to be all I need for big data processing pipelines.

JSON pretty formatting in vim

Quick ad hoc command:

:%!python -m json.tool

For reusing and typing more update your `.vimrc` file with:

com! FormatJSON %!python -m json.tool

After this you should be able to use :FormatJSON command.

This would convert:

[{"classification":"emrfs-site", "properties":{"fs.s3.consistent.retryPeriodSeconds":"10", "fs.s3.consistent":"true", "fs.s3.consistent.retryCount":"5", "fs.s3.consistent.metadata.tableName":"EmrFSMetadata"}, "configurations":[]},{"classification":"spark", "properties":{"maximizeResourceAllocation":"true"}, "configurations":[]},{"classification":"spark-env", "properties":{}, "configurations":[{"classification":"export", "properties":{"PYSPARK_PYTHON":"/usr/bin/python3"}, "configurations":[]}]}]

into

[
    {
        "classification": "emrfs-site",
        "configurations": [],
        "properties": {
            "fs.s3.consistent": "true",
            "fs.s3.consistent.metadata.tableName": "EmrFSMetadata",
            "fs.s3.consistent.retryCount": "5",
            "fs.s3.consistent.retryPeriodSeconds": "10"
        }
    },
    {
        "classification": "spark",
        "configurations": [],
        "properties": {
            "maximizeResourceAllocation": "true"
        }
    },
    {
        "classification": "spark-env",
        "configurations": [
            {
                "classification": "export",
                "configurations": [],
                "properties": {
                    "PYSPARK_PYTHON": "/usr/bin/python3"
                }
            }
        ],
        "properties": {}
    }
]

Multiprocessing in Python – all about pickling

Chances are you heard that multiprocessing in Python is hard. That it takes time and, actually, don’t even try because there’s something like global interpreter lock (GIL), so it isn’t even true parallel execution. Well, GIL is true, but the rest is a lie. Multiprocessing in Python is rather easy.

One doesn’t have to look far to find nice introductions into processing in Python [link1, link2]. These are great and I do recommend reading on them. Even first Google result page should return some comprehensible tutorials. However, what I was missing from these tutorials is some information about handling processing within class.

Multiprocessing in Python is flexible. You can either define Processes and orchestrate them as you wishes, or use one of excellent methods herding Pool of processes. By default Pool assumes number of processes to be equal to number of CPU cores, but you can change it by passing processes parameter. Main methods included in Pool are apply and map, which let you run process with arbitrary arguments or execute parallel map, respectively. There are also asynchronous versions of these, i.e. apply_asyncand map_async.

Quick example:

from multiprocessing import Pool
def power(x, n=10):
    return x**n

pool = Pool()
pow10 = pool.map(power, range(10,20))
print(pow10)
[10000000000, 25937424601, 61917364224, 137858491849, 289254654976, 576650390625, 1099511627776, 2015993900449, 3570467226624, 6131066257801]

Simple, right? Yes, this is all what’s needed. Now, go and use multiprocessing!

Actually, before you leave to follow your dreams there’s a small caveat to this. When executing processes Python first pickles these methods. This create a bottleneck as only objects that are pickle will be passed to processes. Moreover, Pool doesn’t allow to parallelize objects that refer to the instance of pool which runs them. It sounds convoluted so let me exemplify this:

from multiprocessing import Pool

class BigPow:
    def __init__(self, n=10):
        self.pool = Pool()

        self.n = n

    def pow(self, x):
        return x**self.n

    def run(self, args):
        #pows = self.pool.map(ext_pow, args)
        pows = self.pool.map(self.pow, args)
        return sum(pows)

def ext_pow(x):
    return x**10

if __name__ == "__main__":
    big_pow = BigPow(n=10)
    pow_sum = big_pow.run(range(20))
    print(pow_sum)

Code above doesn’t work, unless we replace self.pow with ext_pow. This is because self contains pool instance. We can remove that through removing pool just before pickling through __getstate__ (there’s complimentary function __setstate__ to process after depickling).

from multiprocessing import Pool

class BigPow:
    def __init__(self, n=10):
        self.pool = Pool()

        self.n = n

    def pow(self, x):
        return x**self.n

    def run(self, args):
        pows = self.pool.map(self.pow, args)
        return sum(pows)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

if __name__ == "__main__":
    big_pow = BigPow(n=10)
    pow_sum = big_pow.run(range(20))
    print(pow_sum)

This is good, but sometimes you’ll get an error stating something like “PicklingError: Can’t pickle : attribute lookup __builtin__.instancemethod failed”. In such case you have to update registry for pickle on what to actually goes into pickling.

from multiprocessing import Pool

#######################################
import sys
import types
#Difference between Python3 and 2
if sys.version_info[0] < 3:
    import copy_reg as copyreg
else:
    import copyreg

def _pickle_method(m):
    class_self = m.im_class if m.im_self is None else m.im_self
    return getattr, (class_self, m.im_func.func_name)

copyreg.pickle(types.MethodType, _pickle_method)
#######################################

class BigPow:
    def __init__(self, n=10):
        self.pool = Pool()

        self.n = n

    def pow(self, x):
        return x**self.n

    def run(self, args):
        pows = self.pool.map(self.pow, args)
        return sum(pows)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

if __name__ == "__main__":
    big_pow = BigPow(n=10)
    pow_sum = big_pow.run(range(20))
    print(pow_sum)

Yes, this is all because of Pickle. What can you do with it? In a sense, not much, as it’s the default battery-included solution. On the other, pickle is generally slow and now community standard seems to be dill. It would be nice if something was using dill instead of pickle. Right? Well, we are in luck because pathos does exactly that. It has similar interface to multiprocessing is it’s also easy to use. Personally I’m Ok with multiprocessing, as I like not to import too much, but this is a good sign that there are developments towards more flexible solutions.

Kuramoto in Stan (PyStan)

tl;dr: Project on github: https://github.com/laszukdawid/pystan-kuramoto


Stan is a programming language focused on probabilistic computations. Although it’s a rather recent language it’s been nicely received in data science/Bayesian community for its focus on designing model, rather than programming and getting stuck with computational details. Depending what is your background you might have heard about it either from Facebook and its Prophet project or as a native implementation for Hamiltonian Monte Carlo (HMC) and its optimised variation – No-U-Turn Sampler for HMC (NUTS).

For ease of including models in other programmes there are some interfaces/wrappers available, including RStan and PyStan.

Stan is not the easiest language to go through. Currently there are about 600 pages of documentation and then separate “documentations” for wrappers, which for PyStan isn’t very helpful. Obviously there’s no need for reading all of it, but it took me a while to actually understand what goes where an why. The reward, however, is very satisfying.

Since I’ve written a bit about Kuramoto models on this blog, it’s consistent if I share its implementation in Stan as well. Pystan-kuramoto project uses PyStan, but the actual Stan code is platform independent.

Currently there are two implementations (couldn’t come up with better names):

  • All-to-all, where Kuramoto model fit is performed to phase vector \vec{\Phi} with distinct oscillators, i.e. \vec{\Phi}_{N}(t) = \{\phi_1(t), \phi_2(t), \dots, \phi_N(t)\}.
  • All-to-one, where the model fits superposition (sum) of oscillators to phase time series \Phi_{N}(t) = \sum_{n=1}^{N} \phi_n(t).

In all honesty, this seems to be rather efficient. Optimisation is performed using penalized maximum likelihood estimation with optimization (L-BFGS). Before using it I wasn’t very knowledgeable in the algorithm, but now I’m simply amazed with its speed and accuracy. Thumbs up.

More links

A while ago I’ve started to taste a bit how it feels to work in industry and it feels quite nice. Maybe that’s the specificity of field projected onto the industry, or being tired of how academia works, but I’m enjoying extremely learning all the details about Computer Science, programming and newest technologies.

In addition to last post about Data Science, which still is my main daily ‘look for’, I’ve started to dive deep into computer science. Obviously there are plenty of good information sources and excellent tutorials. Aggregate that I exploiting right now are:

 

I’m planning to add some subpage with links for further reference. Any suggestions are welcomed!