Saturday, 22 April 2017

Distributed Processing: Python celery for 1 M tasks

This post is on python distributed processing using celery and a basic profiling of the same. Also some notes on celery is posted at the end. 

Celery is a distributed task queue system in Python. While custom multiprocessing code can be written, for well defined tasks it is better to leverage a framework than re-invent. Celery works together with a message broker from where enqueued tasks are consumed.

Producers of tasks are programs that want one or more tasks done. Workers are consumers of (execute) these tasks. The tasks are submitted to a message broker like Rabbit-MQ. Workers take the tasks from the broker and execute them. The results of tasks can either be ignored or stored some where. Usually this is memcached or a database.

Versions used from pip in virtual env:


Ubuntu Server 16.04  on which workers run has 8 cores and 16 GB RAM.
Message broker runs on vm with 4 cores and ~ 5 GB RAM. 

For this post 1 Million tasks are run on 2 worker processes. Each worker has 3 queues and the particular task is routed to a particular queue via configuration. Each task is to calculate the nth fibonacci using a recursive algorithm. Since this algorithms does not use any optimization techniques it takes a while to run and simulates a cpu intensive task. Thus demanding distribution of load!

- Message broker is rabbit-mq server and runs on a different virtual machine.
- Workers run on another machine as a service. Named w1 and w2.

Concurrency is set to default which is the number of processors i.e 8 on the machine. A screen of the processors being used is immediately visible as shown below.

Three queues on the workers are shown below. Celery has not yet got the notion of task priorities so color coded queues are used here. The names of the queues can be anything as long as each queue is being used for a particular subset of tasks based on cpu intensive, io driven and the like.

Flower is a tool used to monitor celery tasks. It shows details per worker and also graphs in a monitor page. The flower status page is shown below after start.

10 different threads a used to post the 1 M fibonacci tasks. Ids of submitted tasks are stored and results later retrieved based on these.  The same screen after 1 M tasks have finished is shown below.

At the point where all the tasks had succeeded cProfile run at the client shows

The calls also include the calls for most of the results not all.


1. Celery version 4.0.2 has an issue which causes the workers to crash after a restart and that too when there are pending messages in the message queue. 4.0.2 from PyPi exhibited this issue and so 3.1.24 This issue is described in more detail here at There are a number of related issues too. However there is a merge for the same but is not yet available from PyPi at the time of this writing.

2. For a source structure <project>/src/<celery package with the Celery app>, the worker or the service needs to be triggerred from the folder src (int this case).

3. Each worker can be configured to take tasks from a particular queue. 

4. By running workers on multiple virtual machines, the solution becomes more distributed.

Celery project links:

Sunday, 25 October 2015

coils: A python data structure library

Follow Hari's python datastructure library on BitBucket

Wednesday, 2 September 2015

Coding lessons from Hollywood: Readable Python conditionals with visitor pattern

Programs have conditional statements. They read like the following

if condition do action 
else do something else. 

These can get long, hairy and will read like

if condition_1 do this 
else if condition_2 do that 
else if condition_3 do that 
finally if nothing matches our expected conditions do something else. 

Switch-case statements which allow you to code this in a more reader friendly manner are not present in Python which means that we do it with chained conditional statements. Chained conditional statements can be ok when there are small operations to be done with each condition. For example, 

However as the number of conditions, context and operations to be performed get complicated, the code can become unwieldy, difficult to understand and thus non-maintainable. The code architecture of an application can go south. 

On the lighter side if you need Hollywood to convince you on this, have a look at this guy reading his colleague's code. Your / Others life may depend on it. :)) 

Chains of code like that will qualify as a hack to sleep in less than 30 seconds. For example if we are reading data off a network and need to perform different operations on the packet type. Imagine 20 different packet types. Something like the following can be cumbersome to read after 3-6 months of initial release. The following is an example of using long conditional to do a job.

Where as the following

looks better, reads better and takes the operation done away to another place (decoupled). Even better we can code the handler to take the packet itself and figure things out inside the handler. This can be done in Python using the visitor pattern. This pattern comes across in data structures but serves well in this context too. Plus this approach has been around for quite sometime in Python. But less frequently utilized for a long switch-case scenario.

So given are a number of options and corresponding operations to be performed. We can use different methods of a Python class to handle the different options. This means that we need to know which method for which option? Python makes this very easy as methods are also attributes in the dynamic __dict__ key value pair of objects. i.e we can query a method in an object, add a method in run-time or even change it for a class. 

Finding out which method handles which option can be done as shown in the following code.

We take the option and check if there is a method tied to that option in __dict__ attribute set. If we find one we use that. If no method is found we just do the default operation corresponding to default switch-case statement. Also we can use Python's handy *args and **kwargs to pass data into the handlers.

A class that implements the different methods for the various options can be like the following. Here we do some work on numeric options and some operations based on country codes.

These can be called as follows

To make things better, the Base class can look for functions named after the PacketTypeClass too, if we have different packet classes like IPv4Packet, IPv6Packet etc. i.e handler methods are named after the object type that they handle. 

It is arguable that introducing a class and object everywhere to handle switch-case statements or long conditionals is not helpful all the time. If the conditional chains are small and context is not that complicated it is acceptable to do conditionals as such. A simple dict with options as keys and method names as values will suffice. But that is exactly what a class in Python does with its __dict__ and whichever way chosen, handler code need to go somewhere in the code structure. Better done in a readable way.

Thursday, 13 August 2015

Using *args and **kwargs : Python code post

In a previous post we looked at *args and **kwargs in different function calls. This post is about a practical use of these two in object oriented programming and inheritance.  Again, these can be used is to avoid having to write __init__ for every class that you create in a large set of small classes.

To avoid having to write __init__ for every class in the hierarchy a parent class can implement __init__ based on *args and **kwargs. Each child class that inherits this class can then be written without an __init__. Note that this makes the code or calling less intelligible for API users. For example,

class Parent(object):
    def __init__(self, *args, **kwargs):
        # loop through args (need names attributes) and kwargs to set attributes

class Child(object):

We can call the child as c = Child(name = 'Jason', age = 10). and so on. Notice that we do not see what all the Child takes in its constructor. To avoid this use the technique discussed below with named parameters for __init__.

With in a class we can define a __init__ which is the equivalent of a constructor in Python. For a basic class which has a fixed set of arguments and is not present in a hierarchy this can be straight forward. In a hierarchy of classes a child class's __init__ could be called with a number of arguments, some of which can be inherited attributes from the parent class. In such a scenario the __init__ can use named parameters for its own attributes and then user *args or **kwargs to pass on the rest of the attributes to the parent class. The parent class can build its attributes from these and if there are any extras it can pass them to the grand parent and so on.

Here we look at an example hierarchy. Items -> RetailItems. RetailItems->FoodItems. RetailItems -> ElectronicAccessories. The code for the base class shown below. *args and **kwargs are used to build the instance in __init__. To process args we need to know names of attributes to assign to each value in args. We can use a list as shown below.

A RetailItem which derieves from Item and a FoodItem are shown below.

FoodItem can then be called as shown below. Notice than when we code up a FoodItem we are not in a position to know the attributes except to lookup the class for _expected_attributes or the parent class. As a advantage this saves time taken to write up an __init__.

To make the API more developer friendly we can use name parameters for __init__. So __init__ in the child class will have parameters of its own in addition to *args and **kwargs. To know what else goes in to the parent class we still need to look at the Parent class documentation. However the child class __init__ can take a variable number of named parameters. These parameters will then be handled by parent classed as needed. This is a usage scenario for *args and **kwargs. Notice that there is no expected list of attributes.

To make sure developers know how to build the object we can use the _expected_attributes. However the attributes other than specifications is handled by parent class (thanks to *args and **kwargs).

Code is available here

Sunday, 12 July 2015

Text search 2: Rabin-Karp rolling hash in Python

Searching for sub-strings is a functionality provided by programming language libraries. There are multiple methods to implement this. One algorithm we looked at previously is the "Looking Glass" algorithm which performs some preliminary operations and creates a structure that helps in the search speed. Rabin-Karp on the contrary uses hashing for pattern matching. The interesting part is in how the hash is continuously used to find a match. 

For a given string "the", a hash can be calculated once. Say it is H1. We need to find where this string occurs in a larger string say "Three things cannot be hidden for long: the sun, the moon and the truth". The length of the string to search is 3 here and we know the hash of this search string as H1. We just need to go through the larger string and find where all this hash matches. So we need to hash 3 characters of the larger string dropping a character and taking one character at a time. The hashes hash("Thr"), hash("hre"), hash("ree") etc all the way to the end hash("uth") are compared to H1. This is better shown below

At each shift right we need to compare H1 (calculated once only) to a new hash. Hashes that move through the larger string or the rolling hash are calculated by applying Horner's rule for polynomials. To modify hash by adding a letter we use <new hash = old hash * alphabet_size + letter>. To modify hash by dropping a letter we use <new hash = old hash - letter * power(alphabet_size, length_of_hashed_string - 1) >. This is better explained using numbers as follows.

1) This algorithm is useful when we want to match multiple patterns in the same go. 

2) However this algorithm does not jump ahead intelligently on a mis-match. i.e we could move to the next "t" and continue the search. Looking Glass method does the skipping on a mismatch. So a combination of these to approaches is great. 

3) Also, hashing leads to collisions. So at each match we have to still compare the search string letters individually to make sure it is not a hash-collision. A hash match becomes an indication of a possible positive match.

A python implementation of the algorithm is shown below.

4) Runtime for this algorithm is O(length of search string + length of larger string). Runtime with cProfile for python is shown below for main string with 1024 length and 2 different search string lengths. See that the change in the length of the search string has no effect. The algorithm still goes through the main string one character at a time. The key to the runtime of this algorithm is the hash function.

Wednesday, 22 April 2015

Understand It Yourself Kit: *args and *kwargs for Python

This is a post aimed at Python programmers. Also, this will be useful for engineers who work in other languages like Java and are finding their way in Python.

Programmers are used to argument lists. For example, Java programmers are used to writing the main entry point of their program with an argument list. This looks like

Although other languages can have variable list of arguments in the form of maps and lists in all methods, this does not appear all over the languages for no particular reason. However, this is mainstream in Python code. So we will be seeing a lot of *args, and **kwargs as following in method definitions.

Programmers who come from other languages where this is not mainstream try to understand this from the caller environment whereas it is better the other way around. This kind of understanding also helps in designing flexible Python code. What that means is

If a method has formal arguments, *args and **kwargs, it is saying that, whatever you pass in addition to the formal arguments, will go into *args and any name value arguments will go into **kwargs. So you can call it with quite a lot of arguments in a number of ways. Close to anything goes.

For example the aFunction can be called as follows

But there are some rules to watch out for. For that here is a Understand It Yourself Kit, which you run and just read the output/code and figure it out yourself. A few base code for this was taken from 'Python Epiphanies from Stuart Williams'. This also has a lot of additions which bring up interesting parts of the rules.

Download and run the code from here.

How it is useful: Within the code, notice the call and how args and kwargs get assigned from the console. The extra arguments get assigned to args and kwargs.

Now have a look at the next call and its output. aFunction was called with name-value pairs packed into a dict (comes as kwargs). This got unpacked to its formal arguments.

Thats where this *args and **kwargs are useful. The additional parameters that are passed to aFunction even if they are not useful/make sense to aFunction, will get assigned properly to another function further up the call stack. aFunction just has to pass *args and **kwargs and if it makes sense to the next method, it will.

This is also useful when writing classes in Python. A child class constructor just passes the extra arguments to the parent's constructor call. An example is shown below

Thursday, 16 October 2014

The Server side: webapp performance architecture with Django, load balancing and caching

Web apps need to be quick by reducing latency while serving client requests. In all applications persistent data is on a database on disk (some use solid state too) and the web app on server accesses it. While a few concurrent connections will be easily handled by issuing database requests, when it comes to thousands or millions of requests this approach will fail fast. Scaling up by utilizing powerful hardware is a solution which can buy time until the next growth level of client requests. Even on a scaled up hardware there is room to squeeze more requests through if data caching is part of the design. Accessing data from disk / another server on the network is slower than accessing it from the memory. If the round trip can be avoided this enables fewer comparatively low cost hardware to easily handle considerable loads without increasing latency. A profiling of response times is on the next post. Once server side things like what is the app's actions, when it acts, how fast & efficiently are thought out, the data model can be designed for maximum throughput too.

Here we look at horizontally deploying multiple application servers and a single back end database which handles these app servers' data requests. These app servers are load balanced in round robin method for requests from the external world. Each request to a webapp is given to a different app server. Sticky sessions can also be enabled but that won't be round robin'ed. The objective of round robin is to decrease latency caused by the wait if it was only one server. However, in this scenario the database handles all the app servers. If not addressed this becomes a contention easily thus taking everything to square one. On closer analysis of most web apps/sites/services it would be possible to avoid unnecessary database calls with caching. (The next level to improve performance would be database sharding) What this helps is saving the cost of a network/disk call. Imagine having a good pizza ready on the kitchen table rather than you having call Dominos' or Pizza hut's home delivery. In the case you had to call home delivery, ask for pizzas and keep some ready in your refrigerator just in case someone suddenly gets the urge thus saving you and them the wait next time.  Pizzas become stale, so do data. There has to be precise cache invalidation when the database record is updated. The point is that you might have to hit the database but it is not needed every time a request is made. 

OS: Opensuse 13.1 or any Linux
Web app: developed on Django framework.
Webserver: Apache2 with mod_wsgi to host python webapp
Caching: Memcached
Database: Postgresql
Load balancing: nginx
# application servers: 3
# loadbalancing node: 1

Django code with sample view that uses the cache:

Wireshark trace showing the round robin requests and responses

Memcached on the three server with hits count thus avoiding the database: