Python与Redis的连接教程
今天在写zabbixstormjob监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
redis.StrictRedisImplementationoftheRedisprotocol.ThisabstractclassprovidesaPythoninterfacetoallRediscommandsandan
implementationoftheRedisprotocol.ConnectionandPipelinederivefromthis,implementinghowthecommandsaresentandreceivedtotheRedisserver
使用的方法:
r=redis.StrictRedis(host=xxxx,port=xxxx,db=xxxx) r.xxxx()
有了ConnectionPool这个类之后,可以使用如下方法
pool=redis.ConnectionPool(host=xxx,port=xxx,db=xxxx) r=redis.Redis(connection_pool=pool)
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
classStrictRedis(object): ........ def__init__(self,host='localhost',port=6379, db=0,password=None,socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None,socket_keepalive_options=None, connection_pool=None,unix_socket_path=None, encoding='utf-8',encoding_errors='strict', charset=None,errors=None, decode_responses=False,retry_on_timeout=False, ssl=False,ssl_keyfile=None,ssl_certfile=None, ssl_cert_reqs=None,ssl_ca_certs=None): ifnotconnection_pool: .......... connection_pool=ConnectionPool(**kwargs) self.connection_pool=connection_pool
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
#COMMANDEXECUTIONANDPROTOCOLPARSING defexecute_command(self,*args,**options): "Executeacommandandreturnaparsedresponse" pool=self.connection_pool command_name=args[0] connection=pool.get_connection(command_name,**options)#调用ConnectionPool.get_connection方法获取一个连接 try: connection.send_command(*args)#命令执行,这里为Connection.send_command returnself.parse_response(connection,command_name,**options) except(ConnectionError,TimeoutError)ase: connection.disconnect() ifnotconnection.retry_on_timeoutandisinstance(e,TimeoutError): raise connection.send_command(*args) returnself.parse_response(connection,command_name,**options) finally: pool.release(connection)#调用ConnectionPool.release释放连接
在来看看ConnectionPool类:
classConnectionPool(object): ........... def__init__(self,connection_class=Connection,max_connections=None, **connection_kwargs):#类初始化时调用构造函数 max_connections=max_connectionsor2**31 ifnotisinstance(max_connections,(int,long))ormax_connections<0:#判断输入的max_connections是否合法 raiseValueError('"max_connections"mustbeapositiveinteger') self.connection_class=connection_class#设置对应的参数 self.connection_kwargs=connection_kwargs self.max_connections=max_connections self.reset()#初始化ConnectionPool时的reset操作 defreset(self): self.pid=os.getpid() self._created_connections=0#已经创建的连接的计数器 self._available_connections=[]#声明一个空的数组,用来存放可用的连接 self._in_use_connections=set()#声明一个空的集合,用来存放已经在用的连接 self._check_lock=threading.Lock() ....... defget_connection(self,command_name,*keys,**options):#在连接池中获取连接的方法 "Getaconnectionfromthepool" self._checkpid() try: connection=self._available_connections.pop()#获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组, 会直接调用make_connection方法 exceptIndexError: connection=self.make_connection() self._in_use_connections.add(connection)#向代表正在使用的连接的集合中添加元素 returnconnection defmake_connection(self):#在_available_connections数组为空时获取连接调用的方法 "Createanewconnection" ifself._created_connections>=self.max_connections:#判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化 raiseConnectionError("Toomanyconnections") self._created_connections+=1#把代表已经创建的连接的数值+1 returnself.connection_class(**self.connection_kwargs)#返回有效的连接,默认为Connection(**self.connection_kwargs) defrelease(self,connection):#释放连接,链接并没有断开,只是存在链接池中 "Releasestheconnectionbacktothepool" self._checkpid() ifconnection.pid!=self.pid: return self._in_use_connections.remove(connection)#从集合中删除元素 self._available_connections.append(connection)#并添加到_available_connections的数组中 defdisconnect(self):#断开所有连接池中的链接 "Disconnectsallconnectionsinthepool" all_conns=chain(self._available_connections, self._in_use_connections) forconnectioninall_conns: connection.disconnect()
execute_command最终调用的是Connection.send_command方法,关闭链接为Connection.disconnect方法,而Connection类的实现:
classConnection(object): "ManagesTCPcommunicationtoandfromaRedisserver" def__del__(self):#对象删除时的操作,调用disconnect释放连接 try: self.disconnect() exceptException: pass
核心的链接建立方法是通过socket模块实现:
def_connect(self): err=None forresinsocket.getaddrinfo(self.host,self.port,0, socket.SOCK_STREAM): family,socktype,proto,canonname,socket_address=res sock=None try: sock=socket.socket(family,socktype,proto) #TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY,1) #TCP_KEEPALIVE ifself.socket_keepalive:#构造函数中默认socket_keepalive=False,因此这里默认为短连接 sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1) fork,viniteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP,k,v) #setthesocket_connect_timeoutbeforeweconnect sock.settimeout(self.socket_connect_timeout)#构造函数中默认socket_connect_timeout=None,即连接为blocking的模式 #connect sock.connect(socket_address) #setthesocket_timeoutnowthatwe'reconnected sock.settimeout(self.socket_timeout)#构造函数中默认socket_timeout=None returnsock exceptsocket.erroras_: err=_ ifsockisnotNone: sock.close() .....
关闭链接的方法:
defdisconnect(self): "DisconnectsfromtheRedisserver" self._parser.on_disconnect() ifself._sockisNone: return try: self._sock.shutdown(socket.SHUT_RDWR)#先shutdown再close self._sock.close() exceptsocket.error: pass self._sock=None
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。