【博文推荐】Pythonredis链接建立实现分析

今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。

来安ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!

在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:

 
 
  1. redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
  2. implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:

1 2  r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)  r.xxxx()

有了ConnectionPool这个类之后,可以使用如下方法

  
 
  1. r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
  2. r.xxxx()

这里Redis是StrictRedis的子类

简单分析如下:

在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class  StrictRedis( object ): ........      def  __init__( self , host = 'localhost' , port = 6379 ,                   db = 0 , password = None , socket_timeout = None ,                   socket_connect_timeout = None ,                   socket_keepalive = None , socket_keepalive_options = None ,                   connection_pool = None , unix_socket_path = None ,                   encoding = 'utf-8' , encoding_errors = 'strict' ,                   charset = None , errors = None ,                   decode_responses = False , retry_on_timeout = False ,                   ssl = False , ssl_keyfile = None , ssl_certfile = None ,                   ssl_cert_reqs = None , ssl_ca_certs = None ):           if  not  connection_pool:               ..........                connection_pool  =  ConnectionPool( * * kwargs)           self .connection_pool  =  connection_pool

在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17     # COMMAND EXECUTION AND PROTOCOL PARSING      def  execute_command( self * args,  * * options):          "Execute a command and return a parsed response"          pool  =  self .connection_pool          command_name  =  args[ 0 ]          connection  =  pool.get_connection(command_name,  * * options)   #调用ConnectionPool.get_connection方法获取一个连接          try :              connection.send_command( * args)   #命令执行,这里为Connection.send_command              return  self .parse_response(connection, command_name,  * * options)          except  (ConnectionError, TimeoutError) as e:              connection.disconnect()              if  not  connection.retry_on_timeout  and  isinstance (e, TimeoutError):                  raise              connection.send_command( * args)                return  self .parse_response(connection, command_name,  * * options)          finally :              pool.release(connection)   #调用ConnectionPool.release释放连接

在来看看ConnectionPool类:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47       class  ConnectionPool( object ):           ...........      def  __init__( self , connection_class = Connection, max_connections = None ,                   * * connection_kwargs):    #类初始化时调用构造函数          max_connections  =  max_connections  or  2  * *  31          if  not  isinstance (max_connections, ( int long ))  or  max_connections <  0 :   #判断输入的max_connections是否合法              raise  ValueError( '"max_connections" must be a positive integer' )          self .connection_class  =  connection_class   #设置对应的参数          self .connection_kwargs  =  connection_kwargs          self .max_connections  =  max_connections          self .reset()   #初始化ConnectionPool 时的reset操作      def  reset( self ):          self .pid  =  os.getpid()          self ._created_connections  =  0   #已经创建的连接的计数器          self ._available_connections  =  []    #声明一个空的数组,用来存放可用的连接          self ._in_use_connections  =  set ()   #声明一个空的集合,用来存放已经在用的连接          self ._check_lock  =  threading.Lock() .......      def  get_connection( self , command_name,  * keys,  * * options):   #在连接池中获取连接的方法          "Get a connection from the pool"          self ._checkpid()          try :              connection  =  self ._available_connections.pop()   #获取并删除代表连接的元素,在***次获取connectiong时,因为_available_connections是一个空的数组,              会直接调用make_connection方法          except  IndexError:              connection  =  self .make_connection()          self ._in_use_connections.add(connection)    #向代表正在使用的连接的集合中添加元素          return  connection         def  make_connection( self ):  #在_available_connections数组为空时获取连接调用的方法          "Create a new connection"          if  self ._created_connections > =  self .max_connections:    #判断创建的连接是否已经达到***限制,max_connections可以通过参数初始化              raise  ConnectionError( "Too many connections" )          self ._created_connections  + =  1    #把代表已经创建的连接的数值+1          return  self .connection_class( * * self .connection_kwargs)      #返回有效的连接,默认为Connection(**self.connection_kwargs)      def  release( self , connection):   #释放连接,链接并没有断开,只是存在链接池中          "Releases the connection back to the pool"          self ._checkpid()          if  connection.pid ! =  self .pid:              return          self ._in_use_connections.remove(connection)    #从集合中删除元素          self ._available_connections.append(connection)  #并添加到_available_connections 的数组中      def  disconnect( self ):  #断开所有连接池中的链接          "Disconnects all connections in the pool"          all_conns  =  chain( self ._available_connections,                            self ._in_use_connections)          for  connection  in  all_conns:              connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:

1 2 3 4 5 6 7 class  Connection( object ):      "Manages TCP communication to and from a Redis server"      def  __del__( self ):    #对象删除时的操作,调用disconnect释放连接          try :              self .disconnect()          except  Exception:              pass

核心的链接建立方法是通过socket模块实现:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27      def  _connect( self ):          err  =  None          for  res  in  socket.getaddrinfo( self .host,  self .port,  0 ,                                        socket.SOCK_STREAM):              family, socktype, proto, canonname, socket_address  =  res              sock  =  None              try :                  sock  =  socket.socket(family, socktype, proto)                  # TCP_NODELAY                  sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,  1 )                  # TCP_KEEPALIVE                  if  self .socket_keepalive:    #构造函数中默认 socket_keepalive=False,因此这里默认为短连接                      sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,  1 )                      for  k, v  in  iteritems( self .socket_keepalive_options):                          sock.setsockopt(socket.SOL_TCP, k, v)                  # set the socket_connect_timeout before we connect                  sock.settimeout( self .socket_connect_timeout)   #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式                  # connect                  sock.connect(socket_address)                  # set the socket_timeout now that we're connected                  sock.settimeout( self .socket_timeout)   #构造函数中默认socket_timeout=None                  return  sock              except  socket.error as _:                  err  =  _                  if  sock  is  not  None :                      sock.close() .....

关闭链接的方法:

1 2 3 4 5 6 7 8 9 10 11      def  disconnect( self ):          "Disconnects from the Redis server"          self ._parser.on_disconnect()          if  self ._sock  is  None :              return          try :              self ._sock.shutdown(socket.SHUT_RDWR)   #先shutdown再close              self ._sock.close()          except  socket.error:              pass          self ._sock  =  None    

可以小结如下

1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。

2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。

3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。

本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog./1652935/1583541

网站名称:【博文推荐】Pythonredis链接建立实现分析
转载注明:http://www.shufengxianlan.com/qtweb/news27/188877.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联