locking - Race condition in ZooKeeper and Python based message queue -
i've been evaluating zookeeper simple message queue , i've written 2 simple scripts: mq feeder , mq consumer. feeder, below, inly pushing 20 jobs queue , monitoring queue status (jobs being consumed):
from kazoo.client import kazooclient zk = kazooclient(hosts='xxx') zk.start() in xrange(20): zk.create("/queue/%s" % i, b"%s" % i) while 1: print zk.get_children('/queue')
the consumer, below, being launched few times (up 3 concurrent processes in tests) , takes job list, iterates on find unlocked job, processes (sleeps random number of seconds simulate work) , once done, deletes job , deletes lock:
from kazoo.client import kazooclient kazoo.exceptions import nodeexistserror time import sleep import random zk = kazooclient(hosts='xxx') zk.start() zk.ensure_path("/locks") zk.ensure_path("/queue") while 1: jobs = sorted(zk.get_children('/queue')) if jobs: in jobs: print "checking job: %s" % try: zk.create("/locks/%s" % i) except nodeexistserror: print "job locked, skipping!" pass else: print "job unlocked, processing." sleep(random.randrange(5)) zk.delete("/queue/%s" % i) print "deleted processed job, deleting lock." zk.delete("/locks/%s" % i) pass else: print "there's no locks in queue." pass
the problem seeing, unable track consumer processes exiting with:
traceback (most recent call last): file "zk_consumer.py", line 24, in <module> zk.delete("/queue/%s" % i) file "/library/python/2.7/site-packages/kazoo/client.py", line 1055, in delete return self.delete_async(path, version).get() file "/library/python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in raise self._exception kazoo.exceptions.nonodeerror: ((), {})
while last process remains forever checking single job, remains in queue, locked. obviously, i've logical error here think leads race condition, i've spent time on it, , cant seem spot it. doing wrong here, or zookeeper not viable solution simple job queues?
your code racy. consider sequence,
t1 t2 read queue/1 read queue/1 write lock/1 delete queue/1 delete lock/1 write lock/1 delete queue/1 (fail, no node!)
after lock, need read again make sure no 1 else has deleted queue 1.
Comments
Post a Comment