Skip to content

Bare-bones Tasks Manager in Python

Utilizing multi-core architecture in Python scripts could be challenging even though there is already a number of libraries for that. For simple cases the following home-brewed solution consisting of a small TaskManager class can be a viable option.

Running multi-threaded Python apps is challenging because of global interpreter lock (GIL) [1]. A simple workaround is to start a number of OS sub-processes each of which can run its own interpreter. A little bit of bookkeeping is required to manage tasks that map to such sub-processes. The requirements to the class performing such bookkeeping could be as simple as:

  • initialize tasks list and update it as needed during calculations;
  • keep manageable number of sub-processes started at the same, their number can be specified by user or deduced automatically for more advanced OS-dependent solutions;
  • terminate execution when all sub-processes finish their calculations and the tasks list is empty.

Below is the minimal class that can be extended by inheritance to fit needs of a particular computational needs.

'''Module task_manager.py
***********************

Provides a simple framework for setting up os subprocess
based computations to utilize multi-core architecture.
'''
import os.path
import subprocess
import time

class TaskManager:
    '''Holds tasks lists, runs checks on their completion
    and starts new tasks if tasks slot(s) are available.'''
    def __init__(self, n_threads, timeouts={}):
        '''Minimal initialization.
        A task descriptor consists of a complete parameters list
        for starting execution as an OS sub-process; i.e. at
        minimum it contains name of the executable. Other required
        command line parameters must be also included into the
        list in the proper order as string values.'''
        self.proc_poll_dt = timeouts.get('proc_poll_dt', 0.2)
        self.next_free_dt = timeouts.get('next_free_dt', 0.5)
        self.proc_popen_dt = timeouts.get('proc_popen_dt', 0.5)
        self.n_threads = n_threads
        self.procs = [None] * n_threads
        self.task_descriptors = []

    def update_task_descriptors(self):
        ''' A stub to override in sub-classes.
        Generates from scratch, loads from disk or updates 
        list of the current task descriptions. Must include
        any additional required timeouts and synchronization.'''
        pass

    def update_tasks_status(self):
        '''Polls running processes and clears
        slots of those that have terminated.'''
        last_avail = None
        for proc_id in range(self.n_threads):
            if self.procs[proc_id] is not None:
                return_val = self.procs[proc_id].poll()
                time.sleep(self.proc_poll_dt)
                if return_val is not None:
                    self.procs[proc_id] = None
                    last_avail = proc_id
            else:
                last_avail = proc_id
        return last_avail

    def run(self):
        '''Main loop. Checks if there is an empty task slot and
        starts new task with an entry from self.task_descriptors.'''
        while True:
            last_avail = self.update_tasks_status()
            if last_avail is None:
                time.sleep(self.next_free_dt)
                continue
            else:
                self.update_task_descriptors()
                if not self.task_descriptors:
                    return
                # Open new process:
                params = self.task_descriptors[-1]
                self.procs[last_avail] = subprocess.Popen(params)
                self.task_descriptors = self.task_descriptors[:-1]
                time.sleep(self.proc_popen_dt)

A minimal test code for the class could look like that. The update for tasks list does not create any tasks and the test must terminate right away.

    def test_trivial():
        tmon = TaskManager(3)
        tmon.run()

A test with the tasks list populated once and consisting of calls to sleep function. The sleep task code looks like this:

if __name__ == '__main__':
    import time
    import sys
    print sys.argv[1:]
    time.sleep(10)
    print __file__,':Done.'

and it sitting in task_man_test.py module. The test code for sleep tasks is below:

    class SleepTaskManager(TaskManager):
        def __init__(self):
            TaskManager.__init__(self, 2)
            self.n_created = 0
            self.executable = ['c:\\python27\\pythonw.exe',
                               'task_man_test.py']

        def update_task_descriptors(self):
            if self.n_created == 0:
                self.n_created = 10
                for ii in range(self.n_created):
                    next_task = self.executable + [str(ii)]
                    self.task_descriptors.append(next_task)

    def test_sleep():
        tmon = SleepTaskManager()
        tmon.run()
        print 'Test sleep completed.'

    test_sleep()
    print __file__,':Done.'

Tasks Sharing Using Google Drive

Despite the simplicity of the base implementation, the class can be easily extended to distribute computations across several computers over LAN or the Internet. It can be done using Goggle Drive shared folder in a MapReduce fashion. Before starting computation of a task, it checks if the task is already started by other computers. Creating an empty output file uniquely linked by its name to a task allows to do that with reasonable reliability. If the task is already started, it will be skipped by other computers. Maintaining a global list of tasks placed on Google Drive may be less efficient because of the latency and necessity to open, read and save tasks list file. New file creation by itself has smaller latency. The final task could be generated when all sub-tasks are completed and its purpose could be combining computation results into a single file.

References

1. Global Interpreter Lock on Python Wiki.
2. MapReduce on Wikipedia