From fe5dff356ee37368d53c95fb0bde7c43be1e0918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eren=20G=C3=BCven?= Date: Sun, 27 Dec 2020 02:32:16 +0100 Subject: [PATCH] do part of catchup to upstream these changes are not functional but visual to catch the code up to the upstream to ease maintainability --- memcache.py | 666 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 387 insertions(+), 279 deletions(-) diff --git a/memcache.py b/memcache.py index b38020e..dc9a926 100644 --- a/memcache.py +++ b/memcache.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 -""" -client module for memcached (memory cache daemon) +"""client module for memcached (memory cache daemon) Overview ======== -See U{the MemCached homepage} for more about memcached. +See U{the MemCached homepage} for more +about memcached. Usage summary ============= @@ -22,11 +22,12 @@ mc.set("another_key", 3) mc.delete("another_key") - mc.set("key", "1") # note that the key used for incr/decr must be a string. + mc.set("key", "1") # note that the key used for incr/decr must be + # a string. mc.incr("key") mc.decr("key") -The standard way to use memcache with a database is like this:: +The standard way to use memcache with a database is like this: key = derive_key(obj) obj = mc.get(key) @@ -41,8 +42,10 @@ ====================== More detailed documentation is available in the L{Client} class. + """ +import binascii import sys import socket import time @@ -52,15 +55,16 @@ from io import StringIO, BytesIO -from binascii import crc32 # zlib version is not cross-platform + def cmemcache_hash(key): - return((((crc32(key) & 0xffffffff) >> 16) & 0x7fff) or 1) + return (((binascii.crc32(key) & 0xffffffff) >> 16) & 0x7fff) or 1 serverHashFunction = cmemcache_hash + def useOldServerHashFunction(): """Use the old python-memcache server hash function.""" global serverHashFunction - serverHashFunction = crc32 + serverHashFunction = binascii.crc32 try: from zlib import compress, decompress @@ -74,17 +78,19 @@ def decompress(val): invalid_key_characters = ''.join(map(chr, list(range(33)) + [127])) # Original author: Evan Martin of Danga Interactive -__author__ = "Sean Reifschneider " +__author__ = "Sean Reifschneider " __version__ = "1.51" __copyright__ = "Copyright (C) 2003 Danga Interactive" # http://en.wikipedia.org/wiki/Python_Software_Foundation_License -__license__ = "Python Software Foundation License" +__license__ = "Python Software Foundation License" SERVER_MAX_KEY_LENGTH = 250 -# Storing values larger than 1MB requires recompiling memcached. If you do, -# this value can be changed by doing "memcache.SERVER_MAX_VALUE_LENGTH = N" -# after importing this module. -SERVER_MAX_VALUE_LENGTH = 1024*1024 +# Storing values larger than 1MB requires starting memcached with -I for +# memcached >= 1.4.2 or recompiling for < 1.4.2. If you do, this value can be +# changed by doing "memcache.SERVER_MAX_VALUE_LENGTH = N" after importing this +# module. +SERVER_MAX_VALUE_LENGTH = 1024 * 1024 + class _Error(Exception): pass @@ -98,48 +104,56 @@ class _ConnectionDeadError(Exception): _DEAD_RETRY = 30 # number of seconds before retrying a dead server. -_SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. +_SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout. class Client(local): - """ - Object representing a pool of memcache servers. + """Object representing a pool of memcache servers. See L{memcache} for an overview. In all cases where a key is used, the key can be either: 1. A simple hashable type (string, integer, etc.). - 2. A tuple of C{(hashvalue, key)}. This is useful if you want to avoid - making this module calculate a hash value. You may prefer, for - example, to keep all of a given user's objects on the same memcache - server, so you could use the user's unique id as the hash value. + 2. A tuple of C{(hashvalue, key)}. This is useful if you want + to avoid making this module calculate a hash value. You may + prefer, for example, to keep all of a given user's objects on + the same memcache server, so you could use the user's unique + id as the hash value. + - @group Setup: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog + @group Setup: __init__, set_servers, forget_dead_hosts, + disconnect_all, debuglog @group Insertion: set, add, replace, set_multi @group Retrieval: get, get_multi @group Integers: incr, decr @group Removal: delete, delete_multi - @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\ - set, set_multi, add, replace, get, get_multi, incr, decr, delete, delete_multi + @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, + debuglog,\ set, set_multi, add, replace, get, get_multi, + incr, decr, delete, delete_multi """ - _FLAG_PICKLE = 1<<0 - _FLAG_INTEGER = 1<<1 - _FLAG_LONG = 1<<2 - _FLAG_COMPRESSED = 1<<3 + _FLAG_PICKLE = 1 << 0 + _FLAG_INTEGER = 1 << 1 + _FLAG_LONG = 1 << 2 + _FLAG_COMPRESSED = 1 << 3 _SERVER_RETRIES = 10 # how many times to try finding a free server. # exceptions for Client class MemcachedKeyError(Exception): pass + class MemcachedKeyLengthError(MemcachedKeyError): pass + class MemcachedKeyCharacterError(MemcachedKeyError): pass + class MemcachedKeyNoneError(MemcachedKeyError): pass + class MemcachedKeyTypeError(MemcachedKeyError): pass + class MemcachedStringEncodingError(Exception): pass @@ -149,41 +163,50 @@ def __init__(self, servers, debug=0, pickleProtocol=0, server_max_key_length=SERVER_MAX_KEY_LENGTH, server_max_value_length=SERVER_MAX_VALUE_LENGTH, dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT, - cache_cas = False, flush_on_reconnect=0, check_keys=True): + cache_cas=False, flush_on_reconnect=0, check_keys=True): """ Create a new Client object with the given list of servers. @param servers: C{servers} is passed to L{set_servers}. - @param debug: whether to display error messages when a server can't be - contacted. - @param pickleProtocol: number to mandate protocol used by (c)Pickle. - @param pickler: optional override of default Pickler to allow subclassing. - @param unpickler: optional override of default Unpickler to allow subclassing. - @param pload: optional persistent_load function to call on pickle loading. - Useful for cPickle since subclassing isn't allowed. - @param pid: optional persistent_id function to call on pickle storing. - Useful for cPickle since subclassing isn't allowed. - @param dead_retry: number of seconds before retrying a blacklisted - server. Default to 30 s. - @param socket_timeout: timeout in seconds for all calls to a server. Defaults - to 3 seconds. - @param cache_cas: (default False) If true, cas operations will be - cached. WARNING: This cache is not expired internally, if you have - a long-running process you will need to expire it manually via - client.reset_cas(), or the cache can grow unlimited. + @param debug: whether to display error messages when a server + can't be contacted. + @param pickleProtocol: number to mandate protocol used by + (c)Pickle. + @param pickler: optional override of default Pickler to allow + subclassing. + @param unpickler: optional override of default Unpickler to + allow subclassing. + @param pload: optional persistent_load function to call on + pickle loading. Useful for cPickle since subclassing isn't + allowed. + @param pid: optional persistent_id function to call on pickle + storing. Useful for cPickle since subclassing isn't allowed. + @param dead_retry: number of seconds before retrying a + blacklisted server. Default to 30 s. + @param socket_timeout: timeout in seconds for all calls to a + server. Defaults to 3 seconds. + @param cache_cas: (default False) If true, cas operations will + be cached. WARNING: This cache is not expired internally, if + you have a long-running process you will need to expire it + manually via client.reset_cas(), or the cache can grow + unlimited. @param server_max_key_length: (default SERVER_MAX_KEY_LENGTH) Data that is larger than this will not be sent to the server. - @param server_max_value_length: (default SERVER_MAX_VALUE_LENGTH) - Data that is larger than this will not be sent to the server. - @param flush_on_reconnect: optional flag which prevents a scenario that - can cause stale data to be read: If there's more than one memcached - server and the connection to one is interrupted, keys that mapped to - that server will get reassigned to another. If the first server comes - back, those keys will map to it again. If it still has its data, get()s - can read stale data that was overwritten on another server. This flag - is off by default for backwards compatibility. - @param check_keys: (default True) If True, the key is checked to - ensure it is the correct length and composed of the right characters. + @param server_max_value_length: (default + SERVER_MAX_VALUE_LENGTH) Data that is larger than this will + not be sent to the server. + @param flush_on_reconnect: optional flag which prevents a + scenario that can cause stale data to be read: If there's more + than one memcached server and the connection to one is + interrupted, keys that mapped to that server will get + reassigned to another. If the first server comes back, those + keys will map to it again. If it still has its data, get()s + can read stale data that was overwritten on another + server. This flag is off by default for backwards + compatibility. + @param check_keys: (default True) If True, the key is checked + to ensure it is the correct length and composed of the right + characters. """ local.__init__(self) self.debug = debug @@ -208,58 +231,61 @@ def __init__(self, servers, debug=0, pickleProtocol=0, # figure out the pickler style file = StringIO() try: - pickler = self.pickler(file, protocol = self.pickleProtocol) + pickler = self.pickler(file, protocol=self.pickleProtocol) self.picklerIsKeyword = True except TypeError: self.picklerIsKeyword = False def reset_cas(self): - """ - Reset the cas cache. This is only used if the Client() object - was created with "cache_cas=True". If used, this cache does not - expire internally, so it can grow unbounded if you do not clear it + """Reset the cas cache. + + This is only used if the Client() object was created with + "cache_cas=True". If used, this cache does not expire + internally, so it can grow unbounded if you do not clear it yourself. """ self.cas_ids = {} - def set_servers(self, servers): - """ - Set the pool of servers used by this client. + """Set the pool of servers used by this client. @param servers: an array of servers. Servers can be passed in two forms: - 1. Strings of the form C{"host:port"}, which implies a default weight of 1. - 2. Tuples of the form C{("host:port", weight)}, where C{weight} is - an integer weight value. + 1. Strings of the form C{"host:port"}, which implies a + default weight of 1. + 2. Tuples of the form C{("host:port", weight)}, where + C{weight} is an integer weight value. + """ self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry, - socket_timeout=self.socket_timeout, - flush_on_reconnect=self.flush_on_reconnect) - for s in servers] + socket_timeout=self.socket_timeout, + flush_on_reconnect=self.flush_on_reconnect) + for s in servers] self._init_buckets() - def get_stats(self, stat_args = None): - '''Get statistics from each of the servers. + def get_stats(self, stat_args=None): + """Get statistics from each of the servers. @param stat_args: Additional arguments to pass to the memcache "stats" command. - @return: A list of tuples ( server_identifier, stats_dictionary ). - The dictionary contains a number of name/value pairs specifying - the name of the status field and the string value associated with - it. The values are not converted from strings. - ''' + @return: A list of tuples ( server_identifier, + stats_dictionary ). The dictionary contains a number of + name/value pairs specifying the name of the status field + and the string value associated with it. The values are + not converted from strings. + """ data = [] for s in self.servers: - if not s.connect(): continue + if not s.connect(): + continue if s.family == socket.AF_INET: - name = '%s:%s (%s)' % ( s.ip, s.port, s.weight ) + name = '%s:%s (%s)' % (s.ip, s.port, s.weight) elif s.family == socket.AF_INET6: - name = '[%s]:%s (%s)' % ( s.ip, s.port, s.weight ) + name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight) else: - name = 'unix:%s (%s)' % ( s.address, s.weight ) + name = 'unix:%s (%s)' % (s.address, s.weight) if not stat_args: s.send_cmd(b'stats') elif isinstance(stat_args, bytes): @@ -267,37 +293,40 @@ def get_stats(self, stat_args = None): else: s.send_cmd(b'stats ' + str(stat_args).encode('utf-8')) serverData = {} - data.append(( name.encode('ascii'), serverData )) + data.append((name.encode('ascii'), serverData)) readline = s.readline while 1: line = readline() - if not line or line.strip() == b'END': break + if not line or line.strip() == b'END': + break stats = line.decode('ascii').split(' ', 2) serverData[stats[1].encode('ascii')] = stats[2].encode('ascii') - return(data) + return data def get_slabs(self): data = [] for s in self.servers: - if not s.connect(): continue + if not s.connect(): + continue if s.family == socket.AF_INET: - name = '%s:%s (%s)' % ( s.ip, s.port, s.weight ) + name = '%s:%s (%s)' % (s.ip, s.port, s.weight) elif s.family == socket.AF_INET6: - name = '[%s]:%s (%s)' % ( s.ip, s.port, s.weight ) + name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight) else: - name = 'unix:%s (%s)' % ( s.address, s.weight ) + name = 'unix:%s (%s)' % (s.address, s.weight) serverData = {} - data.append(( name, serverData )) + data.append((name, serverData)) s.send_cmd('stats items') readline = s.readline while 1: line = readline() - if not line or line.strip() == b'END': break + if not line or line.strip() == b'END': + break item = line.split(' ', 2) - #0 = STAT, 1 = ITEM, 2 = Value + # 0 = STAT, 1 = ITEM, 2 = Value slab = item[1].split(':', 2) - #0 = items, 1 = Slab #, 2 = Name + # 0 = items, 1 = Slab #, 2 = Name if slab[1] not in serverData: serverData[slab[1]] = {} serverData[slab[1]][slab[2]] = item[2] @@ -306,7 +335,8 @@ def get_slabs(self): def flush_all(self): """Expire all data in memcache servers that are reachable.""" for s in self.servers: - if not s.connect(): continue + if not s.connect(): + continue s.flush() def debuglog(self, str): @@ -320,9 +350,7 @@ def _statlog(self, func): self.stats[func] += 1 def forget_dead_hosts(self): - """ - Reset every host in the pool to an "alive" state. - """ + """Reset every host in the pool to an "alive" state.""" for s in self.servers: s.deaduntil = 0 @@ -341,7 +369,7 @@ def _get_server(self, key): for i in range(Client._SERVER_RETRIES): server = self.buckets[serverhash % len(self.buckets)] if server.connect(): - #print "(using server %s)" % server, + # print("(using server %s)" % server,) return server, key serverhash = serverHashFunction((str(serverhash) + str(i)).encode("ascii")) return None, None @@ -351,35 +379,35 @@ def disconnect_all(self): s.close_socket() def delete_multi(self, keys, time=0, key_prefix=''): - ''' - Delete multiple keys in the memcache doing just one query. + """Delete multiple keys in the memcache doing just one query. - >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}) - >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', 'key2' : 'val2'} + >>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'}) + >>> mc.get_multi(['a1', 'a2']) == {'a1' : 'val1','a2' : 'val2'} 1 >>> mc.delete_multi(['key1', 'key2']) 1 >>> mc.get_multi(['key1', 'key2']) == {} 1 - - This method is recommended over iterated regular L{delete}s as it reduces total latency, since - your app doesn't have to wait for each round-trip of L{delete} before sending - the next one. + This method is recommended over iterated regular L{delete}s as + it reduces total latency, since your app doesn't have to wait + for each round-trip of L{delete} before sending the next one. @param keys: An iterable of keys to clear - @param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay. - @param key_prefix: Optional string to prepend to each key when sending to memcache. - See docs for L{get_multi} and L{set_multi}. + @param time: number of seconds any subsequent set / update + commands should fail. Defaults to 0 for no delay. + @param key_prefix: Optional string to prepend to each key when + sending to memcache. See docs for L{get_multi} and + L{set_multi}. @return: 1 if no failure in communication with any memcacheds. @rtype: int - - ''' + """ self._statlog('delete_multi') - server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(keys, key_prefix) + server_keys, prefixed_to_orig_key = self._map_and_prefix_keys( + keys, key_prefix) # send out all requests on each server before reading anything dead_servers = [] @@ -388,17 +416,18 @@ def delete_multi(self, keys, time=0, key_prefix=''): for server in server_keys.keys(): bigcmd = [] write = bigcmd.append - if time != None: - for key in server_keys[server]: # These are mangled keys - write("delete %s %d\r\n" % (key, time)) + if time is not None: + for key in server_keys[server]: # These are mangled keys + write("delete %s %d\r\n" % (key, time)) else: for key in server_keys[server]: # These are mangled keys - write("delete %s\r\n" % key) + write("delete %s\r\n" % key) try: server.send_cmds(''.join(bigcmd)) except socket.error as msg: rc = 0 - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) dead_servers.append(server) @@ -411,7 +440,8 @@ def delete_multi(self, keys, time=0, key_prefix=''): for key in keys: server.expect(b"DELETED") except socket.error as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) rc = 0 return rc @@ -438,23 +468,26 @@ def delete(self, key, time=0): try: server.send_cmd(cmd.encode('utf-8')) line = server.readline() - if line and line.strip() in [b'DELETED', b'NOT_FOUND']: return 1 + if line and line.strip() in [b'DELETED', b'NOT_FOUND']: + return 1 self.debuglog('Delete expected DELETED or NOT_FOUND, got: %s' - % repr(line)) + % repr(line)) except socket.error as msg: if isinstance(msg, tuple): msg = msg[1] server.mark_dead(msg) return 0 def incr(self, key, delta=1): - """ - Sends a command to the server to atomically increment the value - for C{key} by C{delta}, or by 1 if C{delta} is unspecified. - Returns None if C{key} doesn't exist on server, otherwise it - returns the new value after incrementing. + """Increment value for C{key} by C{delta} - Note that the value for C{key} must already exist in the memcache, - and it must be the string representation of an integer. + Sends a command to the server to atomically increment the + value for C{key} by C{delta}, or by 1 if C{delta} is + unspecified. Returns None if C{key} doesn't exist on server, + otherwise it returns the new value after incrementing. + + Note that the value for C{key} must already exist in the + memcache, and it must be the string representation of an + integer. >>> mc.set("counter", "20") # returns 1, indicating success 1 @@ -463,20 +496,23 @@ def incr(self, key, delta=1): >>> mc.incr("counter") 22 - Overflow on server is not checked. Be aware of values approaching - 2**32. See L{decr}. + Overflow on server is not checked. Be aware of values + approaching 2**32. See L{decr}. + + @param delta: Integer amount to increment by (should be zero + or greater). - @param delta: Integer amount to increment by (should be zero or greater). @return: New value after incrementing. @rtype: int """ return self._incrdecr("incr", key, delta) def decr(self, key, delta=1): - """ - Like L{incr}, but decrements. Unlike L{incr}, underflow is checked and - new values are capped at 0. If server value is 1, a decrement of 2 - returns 0, not -1. + """Decrement value for C{key} by C{delta} + + Like L{incr}, but decrements. Unlike L{incr}, underflow is + checked and new values are capped at 0. If server value is 1, + a decrement of 2 returns 0, not -1. @param delta: Integer amount to decrement by (should be zero or greater). @return: New value after decrementing or None on error. @@ -495,18 +531,20 @@ def _incrdecr(self, cmd, key, delta): try: server.send_cmd(cmd) line = server.readline() - if line == None or line.strip() ==b'NOT_FOUND': return None + if line is None or line.strip() == b'NOT_FOUND': + return None return int(line) except socket.error as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) return None - def add(self, key, val, time = 0, min_compress_len = 0): - ''' - Add new key with value. + def add(self, key, val, time=0, min_compress_len=0): + '''Add new key with value. - Like L{set}, but only stores in memcache if the key doesn't already exist. + Like L{set}, but only stores in memcache if the key doesn't + already exist. @return: Nonzero on success. @rtype: int @@ -550,66 +588,77 @@ def set(self, key, val, time=0, min_compress_len=0): '''Unconditionally sets a key to a given value in the memcache. The C{key} can optionally be an tuple, with the first element - being the server hash value and the second being the key. - If you want to avoid making this module calculate a hash value. - You may prefer, for example, to keep all of a given user's objects - on the same memcache server, so you could use the user's unique - id as the hash value. + being the server hash value and the second being the key. If + you want to avoid making this module calculate a hash value. + You may prefer, for example, to keep all of a given user's + objects on the same memcache server, so you could use the + user's unique id as the hash value. @return: Nonzero on success. @rtype: int - @param time: Tells memcached the time which this value should expire, either - as a delta number of seconds, or an absolute unix time-since-the-epoch - value. See the memcached protocol docs section "Storage Commands" - for more info on . We default to 0 == cache forever. - @param min_compress_len: The threshold length to kick in auto-compression - of the value using the zlib.compress() routine. If the value being cached is - a string, then the length of the string is measured, else if the value is an - object, then the length of the pickle result is measured. If the resulting - attempt at compression yeilds a larger string than the input, then it is - discarded. For backwards compatability, this parameter defaults to 0, - indicating don't ever try to compress. + + @param time: Tells memcached the time which this value should + expire, either as a delta number of seconds, or an absolute + unix time-since-the-epoch value. See the memcached protocol + docs section "Storage Commands" for more info on . We + default to 0 == cache forever. + + @param min_compress_len: The threshold length to kick in + auto-compression of the value using the zlib.compress() + routine. If the value being cached is a string, then the + length of the string is measured, else if the value is an + object, then the length of the pickle result is measured. If + the resulting attempt at compression yields a larger string + than the input, then it is discarded. For backwards + compatibility, this parameter defaults to 0, indicating don't + ever try to compress. ''' return self._set("set", key, val, time, min_compress_len) def cas(self, key, val, time=0, min_compress_len=0): - '''Sets a key to a given value in the memcache if it hasn't been + '''Check and set (CAS) + + Sets a key to a given value in the memcache if it hasn't been altered since last fetched. (See L{gets}). The C{key} can optionally be an tuple, with the first element - being the server hash value and the second being the key. - If you want to avoid making this module calculate a hash value. - You may prefer, for example, to keep all of a given user's objects - on the same memcache server, so you could use the user's unique - id as the hash value. + being the server hash value and the second being the key. If + you want to avoid making this module calculate a hash value. + You may prefer, for example, to keep all of a given user's + objects on the same memcache server, so you could use the + user's unique id as the hash value. @return: Nonzero on success. @rtype: int - @param time: Tells memcached the time which this value should expire, - either as a delta number of seconds, or an absolute unix - time-since-the-epoch value. See the memcached protocol docs section - "Storage Commands" for more info on . We default to - 0 == cache forever. + + @param time: Tells memcached the time which this value should + expire, either as a delta number of seconds, or an absolute + unix time-since-the-epoch value. See the memcached protocol + docs section "Storage Commands" for more info on . We + default to 0 == cache forever. + @param min_compress_len: The threshold length to kick in - auto-compression of the value using the zlib.compress() routine. If - the value being cached is a string, then the length of the string is - measured, else if the value is an object, then the length of the - pickle result is measured. If the resulting attempt at compression - yeilds a larger string than the input, then it is discarded. For - backwards compatability, this parameter defaults to 0, indicating - don't ever try to compress. + auto-compression of the value using the zlib.compress() + routine. If the value being cached is a string, then the + length of the string is measured, else if the value is an + object, then the length of the pickle result is measured. If + the resulting attempt at compression yields a larger string + than the input, then it is discarded. For backwards + compatibility, this parameter defaults to 0, indicating don't + ever try to compress. ''' return self._set("cas", key, val, time, min_compress_len) def _map_and_prefix_keys(self, key_iterable, key_prefix): - """Compute the mapping of server (_Host instance) -> list of keys to stuff onto that server, as well as the mapping of - prefixed key -> original key. - + """Map keys to the servers they will reside on. + Compute the mapping of server (_Host instance) -> list of keys to + stuff onto that server, as well as the mapping of prefixed key + -> original key. """ # Check it just once ... - key_extra_len=len(key_prefix) + key_extra_len = len(key_prefix) if key_prefix and self.do_check_key: self.check_key(key_prefix) @@ -620,7 +669,8 @@ def _map_and_prefix_keys(self, key_iterable, key_prefix): # build up a list for each server of all the keys we want. for orig_key in key_iterable: if isinstance(orig_key, tuple): - # Tuple of hashvalue, key ala _get_server(). Caller is essentially telling us what server to stuff this on. + # Tuple of hashvalue, key ala _get_server(). Caller is + # essentially telling us what server to stuff this on. # Ensure call to _get_server gets a Tuple as well. str_orig_key = str(orig_key[1]) server, key = self._get_server((orig_key[0], key_prefix + str_orig_key)) # Gotta pre-mangle key before hashing to a server. Returns the mangled key. @@ -643,61 +693,79 @@ def _map_and_prefix_keys(self, key_iterable, key_prefix): return (server_keys, prefixed_to_orig_key) def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0): - ''' - Sets multiple keys in the memcache doing just one query. + '''Sets multiple keys in the memcache doing just one query. >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}) - >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', 'key2' : 'val2'} - 1 + >>> keys = mc.get_multi(['key1', 'key2']) + >>> keys == {'key1': 'val1', 'key2': 'val2'} + True - This method is recommended over regular L{set} as it lowers the number of - total packets flying around your network, reducing total latency, since - your app doesn't have to wait for each round-trip of L{set} before sending - the next one. + This method is recommended over regular L{set} as it lowers + the number of total packets flying around your network, + reducing total latency, since your app doesn't have to wait + for each round-trip of L{set} before sending the next one. @param mapping: A dict of key/value pairs to set. - @param time: Tells memcached the time which this value should expire, either - as a delta number of seconds, or an absolute unix time-since-the-epoch - value. See the memcached protocol docs section "Storage Commands" - for more info on . We default to 0 == cache forever. - @param key_prefix: Optional string to prepend to each key when sending to memcache. Allows you to efficiently stuff these keys into a pseudo-namespace in memcache: - >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}, key_prefix='subspace_') + + @param time: Tells memcached the time which this value should + expire, either as a delta number of seconds, or an + absolute unix time-since-the-epoch value. See the + memcached protocol docs section "Storage Commands" for + more info on . We default to 0 == cache forever. + + @param key_prefix: Optional string to prepend to each key when + sending to memcache. Allows you to efficiently stuff these + keys into a pseudo-namespace in memcache: + + >>> notset_keys = mc.set_multi( + ... {'key1' : 'val1', 'key2' : 'val2'}, + ... key_prefix='subspace_') >>> len(notset_keys) == 0 True - >>> mc.get_multi(['subspace_key1', 'subspace_key2']) == {'subspace_key1' : 'val1', 'subspace_key2' : 'val2'} + >>> keys = mc.get_multi(['subspace_key1', 'subspace_key2']) + >>> keys == {'subspace_key1': 'val1', 'subspace_key2': 'val2'} True - Causes key 'subspace_key1' and 'subspace_key2' to be set. Useful in conjunction with a higher-level layer which applies namespaces to data in memcache. - In this case, the return result would be the list of notset original keys, prefix not applied. - - @param min_compress_len: The threshold length to kick in auto-compression - of the value using the zlib.compress() routine. If the value being cached is - a string, then the length of the string is measured, else if the value is an - object, then the length of the pickle result is measured. If the resulting - attempt at compression yeilds a larger string than the input, then it is - discarded. For backwards compatability, this parameter defaults to 0, - indicating don't ever try to compress. - @return: List of keys which failed to be stored [ memcache out of memory, etc. ]. - @rtype: list + Causes key 'subspace_key1' and 'subspace_key2' to be + set. Useful in conjunction with a higher-level layer which + applies namespaces to data in memcache. In this case, the + return result would be the list of notset original keys, + prefix not applied. - ''' + @param min_compress_len: The threshold length to kick in + auto-compression of the value using the zlib.compress() + routine. If the value being cached is a string, then the + length of the string is measured, else if the value is an + object, then the length of the pickle result is + measured. If the resulting attempt at compression yields a + larger string than the input, then it is discarded. For + backwards compatibility, this parameter defaults to 0, + indicating don't ever try to compress. + + @return: List of keys which failed to be stored [ memcache out of + memory, etc. ]. + @rtype: list + ''' self._statlog('set_multi') - server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(iter(mapping.keys()), key_prefix) + server_keys, prefixed_to_orig_key = self._map_and_prefix_keys( + iter(mapping.keys()), key_prefix) # send out all requests on each server before reading anything dead_servers = [] - notstored = [] # original keys. + notstored = [] # original keys. for server in server_keys.keys(): bigcmd = bytearray() write = bigcmd.extend try: newline = b"\r\n" - for key in server_keys[server]: # These are mangled keys - store_info = self._val_to_store_info(mapping[prefixed_to_orig_key[key]], min_compress_len) + for key in server_keys[server]: # These are mangled keys + store_info = self._val_to_store_info( + mapping[prefixed_to_orig_key[key]], + min_compress_len) if store_info: cmd = bytearray(("set %s %d %d %d\r\n" % (key, store_info[0], time, store_info[1])).encode('utf-8')) # now write to bigcmd: cmd + val + newline @@ -708,7 +776,8 @@ def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0): notstored.append(prefixed_to_orig_key[key]) server.send_cmds(bytes(bigcmd)) except socket.error as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) dead_servers.append(server) @@ -717,33 +786,37 @@ def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0): del server_keys[server] # short-circuit if there are no servers, just return all keys - if not server_keys: return(list(mapping.keys())) + if not server_keys: + return list(mapping.keys()) for server, keys in server_keys.items(): try: for key in keys: - line = server.readline() - if line == b'STORED': + if server.readline() == b'STORED': continue else: - notstored.append(prefixed_to_orig_key[key]) #un-mangle. + # un-mangle. + notstored.append(prefixed_to_orig_key[key]) except (_Error, socket.error) as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) return notstored def _val_to_store_info(self, val, min_compress_len): - """ - Transform val to a storable representation, returning a tuple of the flags, the length of the new value, and the new value itself. + """Transform val to a storable representation. + + Returns a tuple of the flags, the length of the new value, and + the new value itself. """ flags = 0 - - # check types exactly rather than using isinstance, or subclasses - # will be deserialized into instances of the parent class - # (most blatantly, bool --> int) - if type(val) == str: + # Check against the exact type, rather than using isinstance(), so that + # subclasses of native types (such as markup-safe strings) are pickled + # and restored as instances of the correct class. + val_type = type(val) + if val_type == str: val = val.encode('utf-8') - elif type(val) == int: + elif val_type == int: flags |= Client._FLAG_INTEGER val = str(val).encode('ascii') # force no attempt to compress this silly string. @@ -752,7 +825,7 @@ def _val_to_store_info(self, val, min_compress_len): flags |= Client._FLAG_PICKLE file = BytesIO() if self.picklerIsKeyword: - pickler = self.pickler(file, protocol = self.pickleProtocol) + pickler = self.pickler(file, protocol=self.pickleProtocol) else: pickler = self.pickler(file, self.pickleProtocol) if self.persistent_id: @@ -772,8 +845,9 @@ def _val_to_store_info(self, val, min_compress_len): val = comp_val # silently do not store if value length exceeds maximum - if self.server_max_value_length != 0 and \ - len(val) > self.server_max_value_length: return(0) + if (self.server_max_value_length != 0 and + len(val) > self.server_max_value_length): + return 0 return (flags, len(val), val) @@ -793,7 +867,7 @@ def _cmd_builder(self, cmd, key, time, store_info): raise _Error("_cmd_builder: unknown data type (%s)" % type(store_info[2])) - def _set(self, cmd, key, val, time, min_compress_len = 0): + def _set(self, cmd, key, val, time, min_compress_len=0): if self.do_check_key: self.check_key(key) server, key = self._get_server(key) @@ -804,7 +878,8 @@ def _unsafe_set(): self._statlog(cmd) store_info = self._val_to_store_info(val, min_compress_len) - if not store_info: return(0) + if not store_info: + return 0 if cmd == 'cas': if key not in self.cas_ids: @@ -817,7 +892,8 @@ def _unsafe_set(): return(server.expect(b"STORED", raise_exception=True) == b"STORED") except socket.error as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) return 0 @@ -847,13 +923,15 @@ def _unsafe_get(): rkey = flags = rlen = cas_id = None if cmd == 'gets': - rkey, flags, rlen, cas_id, = self._expect_cas_value(server, - raise_exception=True) + rkey, flags, rlen, cas_id, = self._expect_cas_value( + server, raise_exception=True + ) if rkey and self.cache_cas: self.cas_ids[rkey] = cas_id else: - rkey, flags, rlen, = self._expectvalue(server, - raise_exception=True) + rkey, flags, rlen, = self._expectvalue( + server, raise_exception=True + ) if not rkey: return None @@ -862,7 +940,8 @@ def _unsafe_get(): finally: server.expect(b"END", raise_exception=True) except (_Error, socket.error) as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) return None @@ -895,46 +974,64 @@ def gets(self, key): return self._get('gets', key) def get_multi(self, keys, key_prefix=''): - ''' - Retrieves multiple keys from the memcache doing just one query. + '''Retrieves multiple keys from the memcache doing just one query. >>> success = mc.set("foo", "bar") >>> success = mc.set("baz", 42) - >>> mc.get_multi(["foo", "baz", "foobar"]) == {"foo": "bar", "baz": 42} + >>> mc.get_multi(["foo", "baz", "foobar"]) == { + ... "foo": "bar", "baz": 42 + ... } 1 >>> mc.set_multi({'k1' : 1, 'k2' : 2}, key_prefix='pfx_') == [] 1 - This looks up keys 'pfx_k1', 'pfx_k2', ... . Returned dict will just have unprefixed keys 'k1', 'k2'. - >>> mc.get_multi(['k1', 'k2', 'nonexist'], key_prefix='pfx_') == {'k1' : 1, 'k2' : 2} + This looks up keys 'pfx_k1', 'pfx_k2', ... . Returned dict + will just have unprefixed keys 'k1', 'k2'. + + >>> mc.get_multi(['k1', 'k2', 'nonexist'], + ... key_prefix='pfx_') == {'k1' : 1, 'k2' : 2} 1 - get_mult [ and L{set_multi} ] can take str()-ables like ints / longs as keys too. Such as your db pri key fields. - They're rotored through str() before being passed off to memcache, with or without the use of a key_prefix. - In this mode, the key_prefix could be a table name, and the key itself a db primary key number. + get_multi [ and L{set_multi} ] can take str()-ables like ints / + longs as keys too. Such as your db pri key fields. They're + rotored through str() before being passed off to memcache, + with or without the use of a key_prefix. In this mode, the + key_prefix could be a table name, and the key itself a db + primary key number. - >>> mc.set_multi({42: 'douglass adams', 46 : 'and 2 just ahead of me'}, key_prefix='numkeys_') == [] + >>> mc.set_multi({42: 'douglass adams', + ... 46: 'and 2 just ahead of me'}, + ... key_prefix='numkeys_') == [] 1 - >>> mc.get_multi([46, 42], key_prefix='numkeys_') == {42: 'douglass adams', 46 : 'and 2 just ahead of me'} + >>> mc.get_multi([46, 42], key_prefix='numkeys_') == { + ... 42: 'douglass adams', + ... 46: 'and 2 just ahead of me' + ... } 1 - This method is recommended over regular L{get} as it lowers the number of - total packets flying around your network, reducing total latency, since - your app doesn't have to wait for each round-trip of L{get} before sending - the next one. + This method is recommended over regular L{get} as it lowers + the number of total packets flying around your network, + reducing total latency, since your app doesn't have to wait + for each round-trip of L{get} before sending the next one. See also L{set_multi}. @param keys: An array of keys. - @param key_prefix: A string to prefix each key when we communicate with memcache. - Facilitates pseudo-namespaces within memcache. Returned dictionary keys will not have this prefix. - @return: A dictionary of key/value pairs that were available. If key_prefix was provided, the keys in the retured dictionary will not have it present. + @param key_prefix: A string to prefix each key when we + communicate with memcache. Facilitates pseudo-namespaces + within memcache. Returned dictionary keys will not have this + prefix. + + @return: A dictionary of key/value pairs that were + available. If key_prefix was provided, the keys in the returned + dictionary will not have it present. ''' self._statlog('get_multi') - server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(keys, key_prefix) + server_keys, prefixed_to_orig_key = self._map_and_prefix_keys( + keys, key_prefix) # send out all requests on each server before reading anything dead_servers = [] @@ -942,7 +1039,8 @@ def get_multi(self, keys, key_prefix=''): try: server.send_cmd("get %s" % " ".join(server_keys[server])) except socket.error as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) dead_servers.append(server) @@ -951,7 +1049,6 @@ def get_multi(self, keys, key_prefix=''): del server_keys[server] retvals = {} - for server in server_keys.keys(): try: line = server.readline() @@ -962,10 +1059,12 @@ def get_multi(self, keys, key_prefix=''): if isinstance(rkey,bytes): rkey = rkey.decode() val = self._recv_value(server, flags, rlen) - retvals[prefixed_to_orig_key[rkey]] = val # un-prefix returned key. + # un-prefix returned key. + retvals[prefixed_to_orig_key[rkey]] = val line = server.readline() except (_Error, socket.error) as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] server.mark_dead(msg) return retvals @@ -992,10 +1091,11 @@ def _expectvalue(self, server, line=None, raise_exception=False): return (None, None, None) def _recv_value(self, server, flags, rlen): - rlen += 2 # include \r\n + rlen += 2 # include \r\n buf = server.recv(rlen) if len(buf) != rlen: - raise _Error("received %d bytes when expecting %d" % (len(buf), rlen)) + raise _Error("received %d bytes when expecting %d" + % (len(buf), rlen)) if len(buf) == rlen: buf = buf[:-2] # strip \r\n @@ -1026,13 +1126,17 @@ def _recv_value(self, server, flags, rlen): return val def check_key(self, key, key_extra_len=0): - """Checks sanity of key. Fails if: + """Checks sanity of key. + + Fails if: + Key length is > SERVER_MAX_KEY_LENGTH (Raises MemcachedKeyLength). Contains control characters (Raises MemcachedKeyCharacterError). Is not a string (Raises MemcachedKeyError) Is None (Raises MemcachedKeyError) """ - if isinstance(key, tuple): key = key[1] + if isinstance(key, tuple): + key = key[1] if not key: raise Client.MemcachedKeyNoneError("Key is None") if not isinstance(key, str): @@ -1070,11 +1174,12 @@ def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY, m = re.match(r'^(?Punix):(?P.*)$', host) if not m: m = re.match(r'^(?Pinet6):' - r'\[(?P[^\[\]]+)\](:(?P[0-9]+))?$', host) + r'\[(?P[^\[\]]+)\](:(?P[0-9]+))?$', host) if not m: m = re.match(r'^(?Pinet):' - r'(?P[^:]+)(:(?P[0-9]+))?$', host) - if not m: m = re.match(r'^(?P[^:]+)(:(?P[0-9]+))?$', host) + r'(?P[^:]+)(:(?P[0-9]+))?$', host) + if not m: + m = re.match(r'^(?P[^:]+)(:(?P[0-9]+))?$', host) if not m: raise ValueError('Unable to parse connection string: "%s"' % host) @@ -1086,12 +1191,12 @@ def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY, self.family = socket.AF_INET6 self.ip = hostData['host'] self.port = int(hostData.get('port') or 11211) - self.address = ( self.ip, self.port ) + self.address = (self.ip, self.port) else: self.family = socket.AF_INET self.ip = hostData['host'] self.port = int(hostData.get('port') or 11211) - self.address = ( self.ip, self.port ) + self.address = (self.ip, self.port) self.deaduntil = 0 self.socket = None @@ -1127,15 +1232,16 @@ def _get_socket(self): if self.socket: return self.socket s = socket.socket(self.family, socket.SOCK_STREAM) - - if hasattr(s, 'settimeout'): s.settimeout(self.socket_timeout) + if hasattr(s, 'settimeout'): + s.settimeout(self.socket_timeout) try: s.connect(self.address) except socket.timeout as msg: self.mark_dead("connect: %s" % msg) return None except socket.error as msg: - if isinstance(msg, tuple): msg = msg[1] + if isinstance(msg, tuple): + msg = msg[1] self.mark_dead("connect: %s" % msg) return None self.socket = s @@ -1158,16 +1264,17 @@ def send_cmd(self, cmd): def send_cmds(self, cmds): - """ cmds already has trailing \r\n's applied """ + """cmds already has trailing \r\n's applied.""" if not isinstance(cmds, bytes): self.socket.sendall(cmds.encode('ascii')) else: self.socket.sendall(cmds) def readline(self, raise_exception=False): - """Read a line and return it. If "raise_exception" is set, - raise _ConnectionDeadError if the read fails, otherwise return - an empty string. + """Read a line and return it. + + If "raise_exception" is set, raise _ConnectionDeadError if the + read fails, otherwise return an empty string. """ buf = self.buffer recv = self.socket.recv @@ -1185,7 +1292,7 @@ def readline(self, raise_exception=False): return b'' buf += data - self.buffer = buf[index+2:] + self.buffer = buf[index + 2:] return buf[:index] def expect(self, text, raise_exception=False): @@ -1202,8 +1309,8 @@ def recv(self, rlen): foo = self_socket_recv(max(rlen - len(buf), 4096)) buf += foo if not foo: - raise _Error( 'Read %d bytes, expecting %d, ' - 'read returned 0 length bytes' % ( len(buf), rlen )) + raise _Error('Read %d bytes, expecting %d, ' + 'read returned 0 length bytes' % (len(buf), rlen)) self.buffer = buf[rlen:] return buf[:rlen] @@ -1225,9 +1332,10 @@ def __str__(self): def _doctest(): - import doctest, memcache + import doctest + import memcache servers = ["127.0.0.1:11211"] - mc = Client(servers, debug=1) + mc = memcache.Client(servers, debug=1) globs = {"mc": mc} return doctest.testmod(memcache, globs=globs)