- Forked processes (e.g., multiprocessing Python module)
- Threads (e.g., threading Python module)
- Coroutines/Green threads (e.g., eventlet Python module)
Forking processes for the worker and client is a viable option, but is a bit overkill for processes that are most likely I/O bound (communicating with the Gearman job server).
Threads can be unattractive in Python because of the GIL, but I think they would still work well in this case since the Gearman tasks (in my case) are mostly I/O bound.
However, I chose to give the eventlet module a try. Mostly because I had never really used it before, and some other code that I'm working with is already using it.
Below is a quick script that I hacked out to test it. It's a simple echo server using Gearman to deliver the messages to echo. Obviously, this is a bit silly to use Gearman to communicate between functions in the same program. A real-world program would likely have the workers and clients using different Gearman function names. But, hey, it's an example.
#!/usr/bin/env python
import eventlet
# Important to do this before importing gearman
eventlet.monkey_patch()
import gearman
def echo(worker, job):
""" The worker task to execute for each message """
return job.data
def worker1(servers):
""" The worker thread """
worker = gearman.GearmanWorker(servers)
worker.register_task("echo_func", echo)
worker.work()
def client1(servers):
""" The client thread """
client = gearman.GearmanClient(servers)
job_request = client.submit_job("echo_func", "Hello, World")
check_request_status(job_request)
def check_request_status(job_request):
if job_request.complete:
print "Job %s finished! Result: %s - %s" % \
(job_request.job.unique,
job_request.state,
job_request.result)
elif job_request.timed_out:
print "Job %s timed out!" % job_request.unique
elif job_request.state == JOB_UNKNOWN:
print "Job %s connection failed!" % job_request.unique
class Server(object):
def __init__(self):
self._servers = []
def add_job_server(self, host, port=4730):
self._servers.append("%s:%s" % (host, port))
def main(self, tasks):
if not self._servers:
raise Exception("Must add at least one job server.")
thread_list = []
for task in tasks:
thread_list.append(eventlet.spawn(task, self._servers))
for thd in thread_list:
thd.wait()
def main():
server = Server()
server.add_job_server('127.0.0.1')
# These are the tasks (functions) we want to execute, each
# within its own green thread.
tasks = [ worker1, client1 ]
server.main(tasks)
if __name__ == "__main__":
main()