Friday, November 2, 2012

Python, Gearman, Eventlet, Oh My!

There are several examples on the Interwebs of using the Python Gearman API, but I didn't really see any examples of a script acting as both a Gearman worker AND client simultaneously. Since the worker part of it sits in an infinite loop (via the gearman.worker.work() method), you need to do some sort of multitasking to have the same script act as a Gearman client as well. You could handle this a few different ways:
Forking processes for the worker and client is a viable option, but is a bit overkill for processes that are most likely I/O bound (communicating with the Gearman job server).

Threads can be unattractive in Python because of the GIL, but I think they would still work well in this case since the Gearman tasks (in my case) are mostly I/O bound.

However, I chose to give the eventlet module a try. Mostly because I had never really used it before, and some other code that I'm working with is already using it.

Below is a quick script that I hacked out to test it. It's a simple echo server using Gearman to deliver the messages to echo. Obviously, this is a bit silly to use Gearman to communicate between functions in the same program. A real-world program would likely have the workers and clients using different Gearman function names. But, hey, it's an example.


#!/usr/bin/env python

import eventlet
# Important to do this before importing gearman
eventlet.monkey_patch()
import gearman

def echo(worker, job):
    """ The worker task to execute for each message """
    return job.data

def worker1(servers):
    """ The worker thread """
    worker = gearman.GearmanWorker(servers)
    worker.register_task("echo_func", echo)
    worker.work()

def client1(servers):
    """ The client thread """
    client = gearman.GearmanClient(servers)
    job_request = client.submit_job("echo_func", "Hello, World")
    check_request_status(job_request)

def check_request_status(job_request):
    if job_request.complete:
        print "Job %s finished!  Result: %s - %s" % \
              (job_request.job.unique,
               job_request.state,
               job_request.result)
    elif job_request.timed_out:
        print "Job %s timed out!" % job_request.unique
    elif job_request.state == JOB_UNKNOWN:
        print "Job %s connection failed!" % job_request.unique

class Server(object):
    def __init__(self):
        self._servers = []

    def add_job_server(self, host, port=4730):
        self._servers.append("%s:%s" % (host, port))

    def main(self, tasks):
        if not self._servers:
            raise Exception("Must add at least one job server.")
        thread_list = []
        for task in tasks:
            thread_list.append(eventlet.spawn(task, self._servers))
        for thd in thread_list:
            thd.wait()

def main():
    server = Server()
    server.add_job_server('127.0.0.1')

    # These are the tasks (functions) we want to execute, each
    # within its own green thread.
    tasks = [ worker1, client1 ]

    server.main(tasks)

if __name__ == "__main__":
    main()

1 comment:

  1. Turns out, this likely blocks on Linux since poll() and eventlet don't mix. I tested this on my Mac, which doesn't support poll(), so select() is used instead which will work.

    ReplyDelete