Processing large files using python

In the last year or so, and with my increased focus on ribo-seq data, I have come to fully appreciate what the term big data means. The ribo-seq studies in their raw forms can easily reach into hundreds of GBs, which means that processing them in both a timely and efficient manner requires some thought. In this blog post, and hopefully those following, I want to detail some of the methods I have come up (read: pieced together from multiple stack exchange posts), that help me take on data of this magnitude. Specifically I will be detailing methods for python and R, though some of the methods are transferrable to other languages.

My first big data tip for python is learning how to break your files into smaller units (or chunks) in a manner that you can make use of multiple processors. Let’s start with the simplest way to read a file in python.


with open("input.txt") as f:
    data = f.readlines()
    for line in data:
        process(line)

This mistake made above, with regards to big data, is that it reads all the data into RAM before attempting to process it line by line. This is likely the simplest way to cause the memory to overflow and an error raised. Let’s fix this by reading the data in line by line, so that only a single line is stored in the RAM at any given time.


with open("input.txt") as f:
    for line in f:
        process(line)

This is a big improvement, namely it doesn’t crash when fed a big file (though also it’s shorter!). Next we should attempt to speed this up a bit by making use of all these otherwise idle cores.


import multiprocessing as mp

#init objects
pool = mp.Pool(cores)
jobs = []

#create jobs
with open("input.txt") as f:
    for line in f:
        jobs.append( pool.apply_async(process,(line)) )

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

Provided the order of which you process the lines don’t matter, the above generates a set (pool) of workers, ideally one for each core, before creating a bunch of tasks (jobs), one for each line, for the workers to do. I tend to use the Pool object provided by the multiprocessing module due to ease of use, however, you can spawn and control individual workers using mp.Process if you want finer control. For mere number crunching, the Pool object is very good.

While the above is now making use of all those cores, it sadly runs into memory problems once again. We specifically use apply_async function so that the pool isn’t blocked while each line processes. However, in doing so, all the data is read into memory once again; this time stored as individual lines associated with each job, waiting inline to be processed. As such, the memory will again overflow. Ideally the method will only read the line into memory when it is its turn to be processed.


import multiprocessing as mp

def process_wrapper(lineID):
    with open("input.txt") as f:
        for i,line in enumerate(f):
            if i != lineID:
                continue
            else:
                process(line)
                break

#init objects
pool = mp.Pool(cores)
jobs = []

#create jobs
with open("input.txt") as f:
    for ID,line in enumerate(f):
        jobs.append( pool.apply_async(process_wrapper,(ID)) )

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

Above we’ve now changed the function fed to pool of workers to include opening the file, locating the specified line, reading it into memory, and then processing it. The only input now stored for each job spawned is the line number, thereby preventing the memory overflow. Sadly, the overhead involved in having to locate the line by reading iteratively through the file for each job is untenable, getting progressively more time consuming as you get further into the file. To avoid this we can use the seek function of file objects which skips you to a particular location within a file. Combining with the tell function, which returns the current location within a file, gives:


import multiprocessing as mp

def process_wrapper(lineByte):
    with open("input.txt") as f:
        f.seek(lineByte)
        line = f.readline()
        process(line)

#init objects
pool = mp.Pool(cores)
jobs = []

#create jobs
with open("input.txt") as f:
    nextLineByte = f.tell()
    for line in f:
        jobs.append( pool.apply_async(process_wrapper,(nextLineByte)) )
        nextLineByte = f.tell()

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

Using seek we can move directly to the correct part of the file, whereupon we read a line into the memory and process it. We have to be careful to correctly handle the first and last lines, but otherwise this does exactly what we set out, namely using all the cores to process a given file while not overflowing the memory.

I’ll finish this post with a slight upgrade to the above as there is a reasonable amount of overhead associated with opening and closing the file for each individual line. If we process multiple lines of the file at a time as a chunk, we can reduce these operations. The biggest technicality when doing this is noting that when you jump to a location in a file, you are likely not located at the start of a line. For a simple file, as in this example, this just means you need to call readline, which reads to next newline character. More complex file types likely require additional code to locate a suitable location to start/end a chunk.


import multiprocessing as mp,os

def process_wrapper(chunkStart, chunkSize):
    with open("input.txt") as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            process(line)

def chunkify(fname,size=1024*1024):
    fileEnd = os.path.getsize(fname)
    with open(fname,'r') as f:
        chunkEnd = f.tell()
    while True:
        chunkStart = chunkEnd
        f.seek(size,1)
        f.readline()
        chunkEnd = f.tell()
        yield chunkStart, chunkEnd - chunkStart
        if chunkEnd > fileEnd:
            break

#init objects
pool = mp.Pool(cores)
jobs = []

#create jobs
for chunkStart,chunkSize in chunkify("input.txt"):
    jobs.append( pool.apply_async(process_wrapper,(chunkStart,chunkSize)) )

#wait for all jobs to finish
for job in jobs:
    job.get()

#clean up
pool.close()

Anyway, I hope that some of the above was either new or even perhaps helpful to you. If you know of a better way to do things (in python), then I’d be very interested to hear about it. In another post coming in the near future, I will expanded on this code, turning it into a parent class from which create multiple children to use with various file types.

Author