diff --git a/README b/README deleted file mode 100644 index 5db66a7..0000000 --- a/README +++ /dev/null @@ -1 +0,0 @@ -See the source for help: https://github.com/dpnova/tornado-memcache/blob/master/tornadoasyncmemcache.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..a39c298 --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +Memcached 异步客户端 for tornado +=================================== + +### demo: + +```python +import tornado.ioloop +import tornado.web +import tornado.gen as gen +import tornadoasyncmemcache as memcache +import time + +ccs = memcache.ClientPool(['127.0.0.1:11211'], maxclients=100) + + +class MainHandler(tornado.web.RequestHandler): + + @tornado.web.asynchronous + @gen.engine + def get(self): + test_data = yield gen.Task(ccs.get, 'test_data') + if not test_data: + time_str = time.strftime('%Y-%m-%d %H:%M:%S') + + yield gen.Task(ccs.set, 'test_data', 'Hello world @ %s' % time_str) + test_data = yield gen.Task(ccs.get, 'test_data') + + self.write(test_data) + self.finish() + + +application = tornado.web.Application([ + (r"/", MainHandler), +], debug=False) + +if __name__ == "__main__": + application.listen(8888) + tornado.ioloop.IOLoop.instance().start() +``` + +### 更新记录: + +- 加入任务队列,在 100 连接数的情况下,能支持大于 100 的并发 + +> `ab -n 3000 -c 1000 http://127.0.0.1:8888/` 测试通过 diff --git a/test.py b/test.py new file mode 100644 index 0000000..8a7f55e --- /dev/null +++ b/test.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Date: 2013-10-12 17:39:33 +# @Author: vfasky (vfasky@gmail.com) +# @Link: http://vfasky.com +# @Version: $Id$ + +import tornado.ioloop +import tornado.web +import tornado.gen as gen +import tornadoasyncmemcache as memcache +import time + +ccs = memcache.ClientPool(['127.0.0.1:11211'], maxclients=100) + + +class MainHandler(tornado.web.RequestHandler): + + @tornado.web.asynchronous + @gen.engine + def get(self): + test_data = yield gen.Task(ccs.get, 'test_data') + if not test_data: + time_str = time.strftime('%Y-%m-%d %H:%M:%S') + + yield gen.Task(ccs.set, 'test_data', 'Hello world @ %s' % time_str) + test_data = yield gen.Task(ccs.get, 'test_data') + + self.write(test_data) + self.finish() + + +application = tornado.web.Application([ + (r"/", MainHandler), +], debug=False) + +if __name__ == "__main__": + application.listen(8888) + tornado.ioloop.IOLoop.instance().start() diff --git a/tornadoasyncmemcache.py b/tornadoasyncmemcache.py index 9d5c582..76fa898 100644 --- a/tornadoasyncmemcache.py +++ b/tornadoasyncmemcache.py @@ -1,5 +1,5 @@ #!/usr/bin/env python - +# -*- coding: utf-8 -*- """ Example using ClientPool ======== @@ -8,27 +8,27 @@ import tornado.web import tornadoasyncmemcache as memcache import time - + ccs = memcache.ClientPool(['127.0.0.1:11211'], maxclients=100) - + class MainHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self): time_str = time.strftime('%Y-%m-%d %H:%M:%S') ccs.set('test_data', 'Hello world @ %s' % time_str, callback=self._get_start) - + def _get_start(self, data): ccs.get('test_data', callback=self._get_end) - + def _get_end(self, data): self.write(data) self.finish() - + application = tornado.web.Application([ (r"/", MainHandler), ]) - + if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start() @@ -41,29 +41,32 @@ def _get_end(self, data): import types from tornado import iostream, ioloop from functools import partial -import collections +#import collections try: import cPickle as pickle except ImportError: import pickle -__author__ = "Tornadoified: David Novakovic dpn@dpn.name, original code: Evan Martin " -__version__ = "1.0" +__author__ = "Tornadoified: David Novakovic dpn@dpn.name, original code: Evan Martin " +__version__ = "1.0" __copyright__ = "Copyright (C) 2003 Danga Interactive" -__license__ = "Python" +__license__ = "Python" + class TooManyClients(Exception): pass + class ClientPool(object): CMDS = ('get', 'replace', 'set', 'decr', 'incr', 'delete') + _RUQ_ID = 0 def __init__(self, servers, - mincached = 0, - maxcached = 0, - maxclients = 0, + mincached=0, + maxcached=0, + maxclients=0, *args, **kwargs): assert isinstance(mincached, int) @@ -77,52 +80,115 @@ def __init__(self, self._servers = servers self._args, self._kwargs = args, kwargs - self._used = collections.deque() + #self._used = collections.deque() self._maxclients = maxclients self._mincached = mincached self._maxcached = maxcached - self._clients = collections.deque(self._create_clients(mincached)) + #self._clients = collections.deque(self._create_clients(mincached)) + self._clients = self._create_clients(maxclients) + self._task_que = self._init_task_que(maxclients) def _create_clients(self, n): assert n >= 0 return [Client(self._servers, *self._args, **self._kwargs) for x in xrange(n)] + # 初始化执行队列 + def _init_task_que(self, n): + assert n >= 0 + que = [] + for x in xrange(n): + que.append({ + 'ix': x, + 'list': [] + }) + return que + + # 取最小任务的队列id + def get_min_task_ix(self): + min_que = None + + for v in self._task_que: + count = len(v['list']) + if count == 0: + return v['ix'] + elif None == min_que or min_que['count'] > count: + min_que = { + 'ix': v['ix'], + 'count': count + } + return min_que['ix'] + + # 添加任务到队列 + def add_task(self, cmd, args, kwargs): + ix = self.get_min_task_ix() + self._RUQ_ID = self._RUQ_ID + 1 + + self._task_que[ix]['list'].append({ + 'id': self._RUQ_ID, + 'cmd': cmd, + 'args': args, + 'kwargs': kwargs, + 'run': False, + }) + # 马上执行队列 + if len(self._task_que[ix]['list']) == 1: + self.call_client(ix) + + return ix + + # 从指定队列中队任务 + def get_task(self, ix): + for v in self._task_que[ix]['list']: + if False == v['run']: + v['run'] = True + return v + return False + + # 调度客户端执行队列 + def call_client(self, ix): + task = self.get_task(ix) + if False == task: + return + + kwargs = task['kwargs'] + args = task['args'] + client = self._clients[ix] + kwargs['callback'] = partial(self._que_cb, + ix=ix, + task=task, + callback=kwargs['callback']) + getattr(client, task['cmd'])(*args, **kwargs) + + # 执行完任务队列回调 + def _que_cb(self, response, ix, task, callback, *args, **kwargs): + que = self._task_que[ix] + que['list'].remove(task) + callback(response, *args, **kwargs) + # 继续执行 + if len(que['list']) > 0: + self.call_client(ix) + def _do(self, cmd, *args, **kwargs): - if not self._clients: - if self._maxclients > 0 and (len(self._clients) - + len(self._used) >= self._maxclients): - raise TooManyClients("Max of %d clients is already reached" - % self._maxclients) - self._clients.append(self._create_clients(1)[0]) - c = self._clients.popleft() - kwargs['callback'] = partial(self._gen_cb, c=c, _cb=kwargs['callback']) - self._used.append(c) - getattr(c, cmd)(*args, **kwargs) + self.add_task(cmd, args, kwargs) def __getattr__(self, name): if name in self.CMDS: return partial(self._do, name) raise AttributeError("'%s' object has no attribute '%s'" % - (self.__class__.__name__, name)) - - def _gen_cb(self, response, c, _cb, *args, **kwargs): - self._used.remove(c) - if self._maxcached == 0 or self._maxcached > len(self._clients): - self._clients.append(c) - else: - c.disconnect_all() - _cb(response, *args, **kwargs) - + (self.__class__.__name__, name)) + class _Error(Exception): pass + class Client(object): + """ Object representing a pool of memcache servers. - + See L{memcache} for an overview. In all cases where a key is used, the key can be either: @@ -140,17 +206,16 @@ class Client(object): @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\ set, add, replace, get, get_multi, incr, decr, delete """ - _FLAG_PICKLE = 1<<0 - _FLAG_INTEGER = 1<<1 - _FLAG_LONG = 1<<2 + _FLAG_PICKLE = 1 << 0 + _FLAG_INTEGER = 1 << 1 + _FLAG_LONG = 1 << 2 _SERVER_RETRIES = 10 # how many times to try finding a free server. - - + _ASYNC_CLIENTS = weakref.WeakKeyDictionary() # def __new__(cls, servers, max_connections=1000, debug=0, io_loop=None): -# # There is one client per IOLoop since they share curl instances +# There is one client per IOLoop since they share curl instances # io_loop = io_loop or ioloop.IOLoop.instance() # if io_loop in cls._ASYNC_CLIENTS: # return cls._ASYNC_CLIENTS[io_loop] @@ -163,7 +228,7 @@ class Client(object): # instance.servers # cls._ASYNC_CLIENTS[io_loop] = instance # return instance - + def __init__(self, servers, debug=0, io_loop=None): io_loop = io_loop or ioloop.IOLoop.instance() self.io_loop = io_loop @@ -184,7 +249,7 @@ def __init__(self, servers, debug=0, io_loop=None): # self.set_servers(servers) # self.debug = debug # self.stats = {} - + def set_servers(self, servers): """ Set the pool of servers used by this client. @@ -203,7 +268,7 @@ def debuglog(self, str): sys.stderr.write("MemCached: %s\n" % str) def _statlog(self, func): - if not self.stats.has_key(func): + if func not in self.stats: self.stats[func] = 1 else: self.stats[func] += 1 @@ -222,7 +287,7 @@ def _init_buckets(self): self.buckets.append(server) def _get_server(self, key): - if type(key) == types.TupleType: + if isinstance(key, types.TupleType): serverhash = key[0] key = key[1] else: @@ -239,33 +304,40 @@ def _get_server(self, key): def disconnect_all(self): for s in self.servers: s.close_socket() - + def delete(self, key, time=0, callback=None): '''Deletes a key from the memcache. - + @return: Nonzero on success. @rtype: int ''' server, key = self._get_server(key) if not server: - self.finish(partial(callback,0)) + self.finish(partial(callback, 0)) self._statlog('delete') if time: cmd = "delete %s %d" % (key, time) else: cmd = "delete %s" % key - server.send_cmd(cmd, callback=partial(self._delete_send_cb,server, callback)) - + server.send_cmd( + cmd, + callback=partial( + self._delete_send_cb, + server, + callback)) + def _delete_send_cb(self, server, callback): - server.expect("DELETED",callback=partial(self._expect_cb, callback=callback)) - - + server.expect( + "DELETED", + callback=partial( + self._expect_cb, + callback=callback)) + # except socket.error, msg: # server.mark_dead(msg[1]) # return 0 # return 1 - def incr(self, key, delta=1, callback=None): """ Sends a command to the server to atomically increment the value for C{key} by @@ -310,14 +382,22 @@ def _incrdecr(self, cmd, key, delta, callback): self._statlog(cmd) cmd = "%s %s %d" % (cmd, key, delta) - server.send_cmd(cmd, callback=partial(self._incrdecr_send_cb,server, callback)) - + server.send_cmd( + cmd, + callback=partial( + self._incrdecr_send_cb, + server, + callback)) + def _send_incrdecr_cb(self, server, callback): - server.readline(callback=partial(self._send_incrdecr_check_cb, callback=callback)) - + server.readline( + callback=partial( + self._send_incrdecr_check_cb, + callback=callback)) + def _send_incrdecr_check_cb(self, line, callback): - self.finish(partial(callback,int(line))) - + self.finish(partial(callback, int(line))) + # except socket.error, msg: # server.mark_dead(msg[1]) # return None @@ -325,23 +405,25 @@ def _send_incrdecr_check_cb(self, line, callback): def add(self, key, val, time=0, callback=None): ''' Add new key with value. - + Like L{set}, but only stores in memcache if the key doesn't already exist. @return: Nonzero on success. @rtype: int ''' self._set("add", key, val, time, callback) + def replace(self, key, val, time=0, callback=None): '''Replace existing key with value. - - Like L{set}, but only stores in memcache if the key already exists. + + Like L{set}, but only stores in memcache if the key already exists. The opposite of L{add}. @return: Nonzero on success. @rtype: int ''' self._set("replace", key, val, time, callback) + def set(self, key, val, time=0, callback=None): '''Unconditionally sets a key to a given value in the memcache. @@ -355,11 +437,11 @@ def set(self, key, val, time=0, callback=None): @rtype: int ''' self._set("set", key, val, time, callback) - + def _set(self, cmd, key, val, time, callback): server, key = self._get_server(key) if not server: - self.finish(partial(callback,0)) + self.finish(partial(callback, 0)) self._statlog(cmd) @@ -375,13 +457,24 @@ def _set(self, cmd, key, val, time, callback): else: flags |= Client._FLAG_PICKLE val = pickle.dumps(val, 2) - - fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, flags, time, len(val), val) - - server.send_cmd(fullcmd, callback=partial(self._set_send_cb, server=server, callback=callback)) - + + fullcmd = "%s %s %d %d %d\r\n%s" % ( + cmd, key, flags, time, len(val), val) + + server.send_cmd( + fullcmd, + callback=partial( + self._set_send_cb, + server=server, + callback=callback)) + def _set_send_cb(self, server, callback): - server.expect("STORED", callback=partial(self._expect_cb, value=None, callback=callback)) + server.expect( + "STORED", + callback=partial( + self._expect_cb, + value=None, + callback=callback)) # except socket.error, msg: # server.mark_dead(msg[1]) # return 0 @@ -389,7 +482,7 @@ def _set_send_cb(self, server, callback): def get(self, key, callback): '''Retrieves a key from the memcache. - + @return: The value or None. ''' server, key = self._get_server(key) @@ -398,23 +491,47 @@ def get(self, key, callback): self._statlog('get') - server.send_cmd("get %s" % key, partial(self._get_send_cb, server=server, callback=callback)) - + server.send_cmd( + "get %s" % + key, + partial( + self._get_send_cb, + server=server, + callback=callback)) + def _get_send_cb(self, server, callback): - self._expectvalue(server, line=None, callback=partial(self._get_expectval_cb, server=server, callback=callback)) - + self._expectvalue( + server, + line=None, + callback=partial( + self._get_expectval_cb, + server=server, + callback=callback)) + def _get_expectval_cb(self, rkey, flags, rlen, server, callback): if not rkey: - self.finish(partial(callback,None)) + self.finish(partial(callback, None)) return - self._recv_value(server, flags, rlen, partial(self._get_recv_cb, server=server, callback=callback)) - + self._recv_value( + server, + flags, + rlen, + partial( + self._get_recv_cb, + server=server, + callback=callback)) + def _get_recv_cb(self, value, server, callback): - server.expect("END", partial(self._expect_cb, value=value, callback=callback)) - + server.expect( + "END", + partial( + self._expect_cb, + value=value, + callback=callback)) + def _expect_cb(self, expected=None, value=None, callback=None): # print "in expect cb" - self.finish(partial(callback,value)) + self.finish(partial(callback, value)) # except (_Error, socket.error), msg: # if type(msg) is types.TupleType: # msg = msg[1] @@ -441,13 +558,20 @@ def _expectvalue_cb(self, line, callback): callback(None, None, None) def _recv_value(self, server, flags, rlen, callback): - rlen += 2 # include \r\n - server.recv(rlen, partial(self._recv_value_cb,rlen=rlen, flags=flags, callback=callback)) - - + rlen += 2 # include \r\n + server.recv( + rlen, + partial( + self._recv_value_cb, + rlen=rlen, + flags=flags, + callback=callback)) + def _recv_value_cb(self, buf, flags, rlen, callback): if len(buf) != rlen: - raise _Error("received %d bytes when expecting %d" % (len(buf), rlen)) + raise _Error( + "received %d bytes when expecting %d" % + (len(buf), rlen)) if len(buf) == rlen: buf = buf[:-2] # strip \r\n @@ -463,13 +587,13 @@ def _recv_value_cb(self, buf, flags, rlen, callback): else: self.debuglog("unknown flags on get: %x\n" % flags) - self.finish(partial(callback,val)) - + self.finish(partial(callback, val)) + def finish(self, callback): callback() # self.disconnect_all() - + class _Host: _DEAD_RETRY = 30 # number of seconds before retrying a dead server. @@ -493,7 +617,7 @@ def __init__(self, host, debugfunc=None): self.deaduntil = 0 self.socket = None self.stream = None - + def _check_dead(self): if self.deaduntil and self.deaduntil > time.time(): return 1 @@ -509,7 +633,7 @@ def mark_dead(self, reason): print "MemCache: %s: %s. Marking dead." % (self, reason) self.deaduntil = time.time() + _Host._DEAD_RETRY self.close_socket() - + def _get_socket(self): if self._check_dead(): return None @@ -519,14 +643,14 @@ def _get_socket(self): # Python 2.3-ism: s.settimeout(1) try: s.connect((self.ip, self.port)) - except socket.error, msg: + except socket.error as msg: self.mark_dead("connect: %s" % msg[1]) return None self.socket = s self.stream = iostream.IOStream(s) - self.stream.debug=True + self.stream.debug = True return s - + def close_socket(self): if self.socket: # self.socket.close() @@ -536,7 +660,7 @@ def close_socket(self): def send_cmd(self, cmd, callback): # print "in sendcmd", repr(cmd), callback - self.stream.write(cmd+"\r\n", callback) + self.stream.write(cmd + "\r\n", callback) #self.socket.sendall(cmd + "\r\n") def readline(self, callback): @@ -544,23 +668,27 @@ def readline(self, callback): def expect(self, text, callback): self.readline(partial(self._expect_cb, text=text, callback=callback)) - + def _expect_cb(self, data, text, callback): if data != text: - self.debuglog("while expecting '%s', got unexpected response '%s'" % (text, data)) + self.debuglog( + "while expecting '%s', got unexpected response '%s'" % + (text, data)) callback(data) - + def recv(self, rlen, callback): self.stream.read_bytes(rlen, callback) - + def __str__(self): d = '' if self.deaduntil: d = " (dead until %d)" % self.deaduntil return "%s:%d%s" % (self.ip, self.port, d) + def _doctest(): - import doctest, memcache + import doctest + import memcache servers = ["127.0.0.1:11211"] mc = Client(servers, debug=1) globs = {"mc": mc} @@ -579,6 +707,7 @@ def to_s(val): if not isinstance(val, types.StringTypes): return "%s (%s)" % (val, type(val)) return "%s" % val + def test_setget(key, val): print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)), mc.set(key, val) @@ -591,18 +720,21 @@ def test_setget(key, val): return 0 class FooStruct: + def __init__(self): self.bar = "baz" + def __str__(self): return "A FooStruct" + def __eq__(self, other): if isinstance(other, FooStruct): return self.bar == other.bar return 0 - + test_setget("a_string", "some random string") test_setget("an_integer", 42) - if test_setget("long", long(1<<30)): + if test_setget("long", long(1 << 30)): print "Testing delete ...", if mc.delete("long"): print "OK"