AWS’ CDK for ECS Fargate, or how to run infrequent large jobs

Yep, it’s one of those titles where unless you know what it’s about you probably don’t know what it’s about. If you feel with these abbreviations, go ahead and scroll down a bit. Otherwise, let me go ahead and expand here a bit: Amazon Web Services’ Cloud Development Kit for Elastic Container Service Fargate. That’s probably not clearer so on to the descriptions we go.

Syllabus

Cloud Development Kit

This is an SDK for the Cloud Formation. It allows to write an infrastructure as if it was a code. It currently supports a couple of languages, including Python and Java, but it feels that the main one is TypeScript. The CDK compiles to native Cloud Formation (Cfn) so whatever is missing in Cfn will also be missed in CDK. Additionally, some modules are in experimental phase which means that their api isn’t fully established. I doubt whether these changes are going to be significant; most likely property naming or different default setting. However, they reserve an option to introduce breaking changes to these modules, e.g. Cognito.

Elastic Container Service (ECS)

As with most AWS services, just ignore the “Elastic” part and we’re set – Container service. It allows to run containers, mainly Docker, away from your machine. It has some functionality to enable Docker Swarm or Kubernetes-like orchestrating and means to provision resources when needed. Actually, currently there are two types of resource provisioning – self- and auto-managed. The self-managed solution is simply called “EC2” as it requires you to provide EC2 or auto-scaling group where the ECS can install its framework and per need utilize required volume. The auto-managed option is called…

Fargate

Treat this like a heavy AWS Lambda and you won’t be too far off. The difference is that the Lambda is often used to run just a code and sometimes the whole runtime provided in a single container. With the ECS you have to provide at least one container which are bundled in a group and renamed to a Task Definition. The Fargate service allows you to forget about everything except mentioned task definitions. They’ll do the provisioning and scaling for you (not for free) but you need to specify metrics based on which you want the scaling in and out.

How to run infrequent large jobs?

A couple of times there have been a situation when occasionally I need to run a large job/script. By a large I mean that its execution on my laptop takes about 10-60 min. This needs to run every week for 100 of different configuration. A use case is retraining a prediction model with the latest weekly report. All in all I need to have a medium computational job that will burst once a week. As with any problem there are many potential solutions. Before stating what’s my preferred design pattern let’s strike out a couple of candidates.

Amazon Lambdas. These would be awesome if they didn’t have a timeout. Unfortunately, access to their process is being shut down <a href="https://docs.aws.amazon.com/lambda/latest/dg/limits.html"after 15 min and, besides, their memory is up to 3Gb which sometimes might be to little. Smart people might suggest dividing the logic into finer granularity, to which I’d say that they’re not that smart and don’t try to fix everything with a hammer.

Why not just have one host instance and run all these jobs one after another? Well, why not just pass exam by changing the question and answering your own? No, I want them all done within an hour since getting the result so I can plan the following week accordingly.

Ok, maybe have a periodic function like a cron job or CloudWatch event and run a lambda function that provisions EC2 hosts, and… ? This quickly becomes dependency hell. You need to provision host, then run there something, deprovision… it quickly changes into a Step Function workflow and you need to maintain code for the infrastructure and its logic. Way too much hassle.

My preferred solution is ECS. Containers have this nice property that once you try them, you like them and you stay with them. What works for me is to have all the logic in a container with specific entrypoint (simple dockerfile example below) and wrapped it into a Task Defintiion that provides arguments (command) to the container. The number of running tasks depends on an SQS size; if it has more than 0 messages then keep on adding tasks. These messages can have additional parameters that the logic knows how to extra. Done. That’s it. The autos calling property will take care that for the majority of time there are 0 containers and as soon as one start sending messages it will increase the number of containers.

How does the CDK come to play here? They provide a solution to do just that with only a few lines of code. CDK has a module called ECS patterns which provides recipes for the common ECS patterns like Application/Network Load Balanced clusters or periodic scheduled jobs. The one that I talked about is called Queue Processing Fargate Service (there’s also EC2 version). Excluding alarms, the whole infrastructure for mentioned services takes about 5 lines (basic example below). There are of course additional parts dependent on your service but the infrequent scaling bit is done. Cool, right?

Example of ECS’s CDK TypeScript

const queue: sqs.Queue = new sqs.Queue(this, 'ResourceQueue', 'MySqsQueue');
const image = ecs.ContainerImage.fromEcrRepository(ecr.Repository.fromRepositoryName(this, 'ResourceName', 'container');

const scalingSteps: Array = [ {change: -1, upper: 0}, {change: 1, lower: 0}, ];
const command: Array = ["--sqsUrl", queue.queueUrl];

const ecsService = new ecs_patterns.QueueProcessingFargateService(this, "FargateService", {
image, command scalingSteps, queue
});

Typical docker for Python jobs


FROM python:3.7-slim

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENTRYPOINT [ "python", "main.py" ]

Habituating to AWS Glue

Despite my strong opinion against the AWS Glue with its unfriendly documentation and strange approach to anything… I ended up using it as the main framework for an AWS native ETL (Extract, Transform, Load) service. The whole journey felt like trying to make divorced parents get back together. They’re working together but the process felt artificial and not sure whether they’re meant for each other. The success was due to finding out Glue’s some dirty secrets.

What’s the problem?

To be completely fair, the problem with Glue is because of a use case that seems trivial but is surprisingly challenging. The goal is to have a workflow of dependent jobs all of which lift and transform a few Redshift’s tables and upserts the result into the same cluster. Simple, right?
For starters, although the Glue context allows reading from JDBC (Redshift connector) it can only read data by specifying a table name, thus it lifts the whole table. That would be fine if we were dealing with tables up to a few GB since that’s transferred using UNLOAD, which is fast, to S3, which is cheap. In my use case, however, some tables will soon be in TB so lifting the whole table is a waste in bandwidth, connection time and most importantly money spent on the Glue, Redshift and S3.

The first workaround was to use directly the Spark context with it’s JDBC connector. It works nicely for lifting data with custom SQL clauses allowing for joins and wheres, resulting in a DataFrame. Great and almost done. But, now the problem is with upserts. Redshift does not support upserts. The recommended method is to insert into a temporary table, delete all duplicates from the target and then append new data. Spark connector has “execute SQL” method but… it doesn’t support transactions. We definitely want to avoid a situation when the deletion is successful but the insert is corrupted.

At this point, it felt like being betrayed by the Glue promises; no native support for such simple use case and they promised one-stop-shop for all ETL. The AWS is pushing hard to make the Redshift default analytics DB but most tools either provide support big data dumps to Redshift or exports to other services. If I can’t get AWS’ support then let me help myself. Let’s import a custom package. But, since I’m going to do that, why not use Lambdas with Step Functions or Data Pipeline?

Why not Lambda (Step Function) or Data Pipeline?

Both are viable options and both have their quirks. For one, Data Pipeline is significantly limited in what it can do. Nevermind that it looks and feels like a service that owners want to deprecate but it does something Ok and it has dependent users. Not much has changed in the last couple of years and, besides, how seriously can you treat something that requires names to start with “My” like “MyFirstDataPipeline”. There are RedshiftCopyActivity and SqlActivity which might be helpful here but they still require provisioning a resource (EC2 or EMR) to run on. A micro EC2 should be fine but if I’m going to define step by step everything I might as well not limit my activity options and go straight to the AWS Step Functions.

AWS Step Functions seem to be the proper solution. The list of triggering events and what actions can be executed is constantly growing. It seems to be easily expandable and, given that many new services quickly after their release have a hook, it gives a hope that this is The AWS orchestration solution. What’s the quirk? Well, we still need to run the query somewhere. The obvious choice is Lambda. In the majority of cases that should be enough but in general there’s a max timeout of 15 min we already have some queries that take about 20 min. There was a hope that since the query can be written as the Redshift procedure without any output it shouldn’t require an active connection to finish it. Unfortunately, even though neither pg8000 and psycopg wouldn’t cancel their job on the timeout, the Redshift would treat it as a broken transaction and rollback. Since the Lambda is a process and until there’s another requesting the same resource it will live, some hacking might allow to not kill the connection on the timeout but this wouldn’t be reliable. So, two-way-door plan: let’s focus on the Glue and if their workflow is limited we can execute Glue job via Step Functions. Either way, there’s going to be a boilerplate written so it might be starting with the Glue.

Revelation…

I’m using somehow interchangeably “Glue job” and “Glue Shell job” but that’s only to refer that I tried using either of Glue solutions. In reality, these two are completely different beasts and shouldn’t stand close to each other (and the documentation should be definitely more clear on this). The Glue Shell job can be either an EMR job (DCU 1) or EC2 job (DCU 0.0625) in which case that’s a Lambda with a max timeout of 24 hours. Strangely the mechanism of importing custom packages is significantly different. In case of the Glue/EMR job, one can zip packages, upload to S3 and add them via job arguments (–extra-files). For the Glue/EC2 job, these extra packages need to be packaged into a single .egg file and upload exist on the job’s creation. Either case requires Python native code without any binary/C bindings so no psycopg as a connector package and no usage of pip. Difficult, challenging but that’s fine. Whilst debugging unsuccessful import I printed out what are the available packages in the environment and, lo and behold, the solution was always within the reach. It turns out that there are some officially not mentioned packages and one of them is PyGreSQL – a PostgreSQL/Redshift connector. This allows to execute any query on the Redshift without any special magic; just import PyGreSQL and enjoy. Having question marks still flying above my head, we reached out to the AWS support and followed with the AWS Glue team. Long story short, they’ll add the package to officially supported list.

Final solution

After a whole lot of frustration and complaining, we managed to get a lean and extensible solution. Every few hours a trigger executes a workflow of series of depended jobs; some are pure SQL and are Spark jobs. There’s no timeout problem, we have retries, alarms and everything is in a cloud formation script. P1 is ready and now time to start pulling data instead of waiting for pushes.

AWS EMR with data in medium input and large output in AWS S3

The last post about AWS EMR and S3 has resulted in few people messaging me directly. To ease others let me add something about how I approach a specific problem.

As mentioned previously when dealing with large amount of data some precious needs to be made. There isn’t a solution which would fit all computation problems (obviously) but that doesn’t mean there aren’t better starting points.

In case when the input data is relatively small, say less than a terabyte, and the processing is highly parallelizable producing larger output, then it helps to do everything locally. If the input data is in S3, or we want to store the output to S3, then one can copy data with S3-dist-cp. It’s an extended version of dist-cp with the understanding of AWS S3 so it’s rather safe. All EMR instances have it installed by default making it easy to either execute through shell after ssh onto master, or, which is preferred, execute it as a EMR job step.

It’s reliable enough that for a given set of problems it was better to write a quick wrapper which converted a single step

spark-submit s3://bucket/path/to/script.py --src=s3://bucket/input/data --dest=s3://bucket/output/data

into three steps, download-process-upload, i.e.

s3-dist-cp --src=s3://bucket/input/data --dest=/hadoop/input/data
spark-submit s3://bucket/path/to/script.py --src=/hadoop/input/data --dest=/hadoop/output/data
s3-dist-cp --src=/hadoop/output/data --dest=s3://bucket/output/data

This is great when we have a large number of executors, definitely more than 200. But even then, experiment. Sometimes it’s better to reduce the number of executors and increase their onload.

Tiny bit about AWS EMR on big data

One of the recent projects I’ve worked on involved processing billions of row stored in AWS S3 in terabyte size data. That was the biggest I’ve worked so far and, even though I don’t like the term, it broke through the big data barrier. Just handling the data with popular toolkits, such as scikit-learn or MXNet, created so many problems that it was easier to create our own solutions. But the biggest surprises came from places least expected.

The first surprise came from AWS EMR. With such amount of data, there is no other way than to use a large computation cluster and the EMR is quite easy to set up. Web UI is rather nicely explained and once you know what you want you can use CLI or SDK to do so programmatically. Cool, right? So what are the problems? There are plenty of things that simply don’t work as they should or are not mentioned that they work differently. The number the one-click install applications that the EMR supports is rather big and you can see some explanation for any of them. However, nowhere is mentioned that the EMR Spark is a fork of Apache Spark and thus slightly different. It comes with different default settings so the best practices for Apache Spark aren’t the same and searching for EMR Spark just doesn’t return anything. It took me a while to find out that accessing S3 should be through s3:// or possibly through s3n:// but it’s deprecated and slow. It also states that you shouldn’t use s3a:// which is, in contrast, is the recommended way of doing with Apache Spark. Oh, and while I’m on the S3…

Another big surprise came from AWS S3 itself. Thinking how global and popular the service is I was surprised to learn that there are actual limitations on the connection. Ok, I wasn’t surprised that there are any, but I thought they were much bigger. According to AWS S3 documentation on Request Rate and Performance Considerations one shouldn’t exceed 100 PUT/LIST/DELETE or 300 GET requests per second. It is averaged over time so occasional bursts are Ok but do it too often and S3 is going to throttle you. Why this matters? When you are using Spark to save directly to S3, e.g.

    val orders = sparkContext.parallelize(1 to 1000000)
    orders.saveAsTextFile("s3://bucketName/orders")

and you are working with hundreds of executors (processes) on small tasks (batches) then you are going to query S3 a lot. Moreover, by default Spark saves output in a temporary directory and once save is done it renames and moves everything. On file system that’s almost instantaneous operation but on object storage, such as S3, this is another big read and save, and it takes a significant amount of time. Yes, this can and should be avoided by proper configuration or repartitioned to a smaller number before but learning about this the hard way is not the nicest experience. Moreover, one would expect that the integration between EMR and S3 would be smoother.

Having said all of that I need to highlight that working with Spark and Hadoop on AWS EMR is rather simple. It takes a long time to learn the nuances and proper configuration per task but once that’s done the life gets only better. One of features I’m looking forward in the future is a better integration with MXNet. Both MXNet and Tensorflow allow for nice utilization of CPU and GPU clusters so it should be a matter of time for EMR to support that out of the box, right? Hopefully. In the meantime Spark + Hadoop + Ganglia + Zeppelin seems to be all I need for big data processing pipelines.