深入研究sqlalchemy连接池-程序员宅基地

技术标签: python  运维  数据库  

简介:

相对于最新的MySQL5.6,MariaDB在性能、功能、管理、NoSQL扩展方面包含了更丰富的特性。比如微秒的支持、线程池、子查询优化、组提交、进度报告等。

本文就主要探索MariaDB当中连接池的一些特性,配置。来配合我们的sqlalchemy。

一:起因

本来是不会写这个东西的,但是,写好了python--flask程序,使用sqlalchemy+mariadb,部署以后总是出问题,500错误之类的。

使用默认连接参数

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)

错误提示是:

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)

http://sqlalche.me/e/e3q8:

OperationalError:

Exception raised for errors that are related to the database’s operation andnot necessarily under the control of the programmer, e.g. an unexpecteddisconnect occurs, the data source name is not found, a transaction could notbe processed, a memory allocation error occurred during processing, etc.

This error is aDBAPI Errorand originates fromthe database driver (DBAPI), not SQLAlchemy itself.

TheOperationalErroris the most common (but not the only) error class usedby drivers in the context of the database connection being dropped, or notbeing able to connect to the database. For tips on how to deal with this, seethe sectionDealing with Disconnects.

意思是没有正确断开和数据库的连接。

二:处理断开

http://docs.sqlalchemy.org/en/latest/core/pooling.html#pool-disconnects

官方给了三种方案来解决这个问题:

1.悲观处理

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

pool_pre_ping=True

表示每次连接从池中检查,如果有错误,监测为断开的状态,连接将被立即回收。

2.自定义悲观的ping

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select([1]))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select([1]))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result

说实话,没怎么看明白。

像是try一个select 语句,如果没问题就关闭。

 

3.乐观处理

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")

这个看懂了,try一个select语句,如果无效,就返回Connection was invalidated!,然后开一个新的连接,再去执行select。这个应该写个装饰器,放在每个查询前面。

4.使用连接池回收

from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

这种方式就比较简单了,在连接参数中写上连接超时时间即可。

5.这是自己看文档找到的方法

from sqlalchemy.pool import QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool

在sqlalchemy.pool下有已经配置好的连接池,直接使用这些连接池也应该可以。

三:测试

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

为避免因数据库交叉连接,首先开启5个MARIADB

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_recycle=3600)

用这5种连接参数进行连接测试。

如果你愿意,也可以继续开,QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool,把这几种都测试一下。

 

8801 8805 均会不同程度的出现500错误,8801频率还高点。

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)
sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)


 

Internal Server Error

The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.

等会儿看看8802  8803 8804如何。

四:深入研究sqlalchemy源码

VENV\Flask_Base\Lib\site-packages\sqlalchemy\engine\__init__.py

看起来,没有默认值。所以engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)报错频率比较高。

五:研究pool源码

VENV\Flask_Base\Lib\site-packages\sqlalchemy\pool.py

看来poolclass的类型都定义在这里了。

1.SingletonThreadPool

A Pool that maintains one connection per thread

每个线程维护一个连接的池。

2.QueuePool

A :class:`.Pool` that imposes a limit on the number of open connections.

这种方式限制了连接数量,QueuePool是默认的连接池方式,除非使用了方言,也就是第三方链接库。

难怪我使用MySQL-connector-python时老出错呢,没打开连接池啊。

3.NullPool

A Pool which does not pool connections...

不使用连接池

4.StaticPool

A Pool of exactly one connection, used for all requests.

一个完整的连接池,用于所有的连接。

5.AssertionPool

A :class:`.Pool` that allows at most one checked out connection at any given time.

任何时间只给一个签出连接?为了debug模式?不懂了。

看的官方说明也没这么详细。

这么看来,如果我使用默认链接库,可以不加参数试试。

mysql-python是sqlalchemy默认的mysql链接库,我在windows下装不上。放弃测试默认链接库,手动指定连接池为QueuePool。

或者指定连接池类型为:QueuePool   StaticPool   SingletonThreadPool(多线程的时候)

六:连接池类型测试

修改测试docker

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_06 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True))
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=SingletonThreadPool)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=StaticPool)
Flask_Plan_06   8806       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=AssertionPool)

七:编写测试脚本

 

import requests
import time
i = 1
while True:
    try:
        r=requests.get('http://192.168.0.104:8801',timeout=5)
        if  r.status_code==200:
            print(time.strftime('%Y-%m-%d %H:%M:%S')+'---'+str(i)+'---'+str(r.status_code)+'---ok')
        else:
            print(time.strftime('%Y-%m-%d %H:%M:%S') + '---' + str(i) + '---' + str(r.status_code) + '-----------badr')
            break
        time.sleep(1)
        i+=1
    except:
        print('except')
        print(time.strftime('%Y-%m-%d %H:%M:%S') +'---'+str(i)+'-----------bad')
        break

修改地址,把几个测试服务都开始跑。

出错就会停了。

代码很烂,凑活测试而已。

从晚上22:30睡觉到早上6:10起床,pool_pre_ping=True,SingletonThreadPool,QueuePool,NullPool,StaticPool,AssertionPool,都很稳定,访问代码都是200

八:继续研究相关代码

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#using-connection-pools-with-multiprocessing

使用连接池进行多重处理

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#api-documentation-available-pool-implementations

api文档--连接池的实现

classsqlalchemy.pool.Pool(creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)

 

Parameters:    
creator–可调用的函数返回对象。
recycle– 超时回收时间。如果连接超过这个时间,连接就被关闭,换一个新的连接
logging_name - 日志标识名称
echo– 是否打印sql语句
use_threadlocal–是否使用线程,在同一应用程序的线程使用相同的连接对象
reset_on_return–在返回前的操作
    rollback,大概是自动回滚
    True 同为回滚
    commit 大概是自动提交的意思
    None 无操作
    none 无操作
    False 无操作
events– 列表元组,每个表单会传递给listen………………没搞懂
listeners - 弃用,被listen取代
dialect–链接库,使用create_engine时不使用,由引擎创建时处理
pre_ping–是否测试连接

基本上这些参数都在engine-creation-api中

http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#engine-creation-api

Pool                  (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
StaticPool         (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
NullPool            (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
QueuePool          (creator,pool_size=5,max_overflow=10,timeout=30,**kw)
SingletonThreadPool(creator,pool_size=5,**kw)
AssertionPool      (*args,**kw)

这下清楚了,Pool,StaicPool,NullPool,都一样,直接回收,效率一定低了。

我们就指定默认的QueuePool好了。以后观察着服务器的负载,负载大了以后,调整就好了。

自定义方法如下:

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',
                       pool_size=5,
                       max_overflow=10,
                       pool_timeout=30,
                       pool_pre_ping=True)

九:总结

曲折的道路,终于找到了解决方案。

sqlalchemy的教程当中,很少有讲如何部署的。很多又是linux开发。可能在linux下很容易装默认链接库,部署的时候就自动使用了QueuePool连接池。所以这种问题很少出现。

我在windows下开发,部署在linux,开发和部署都使用了非默认链接库,导致没有使用默认连接池。

那么随着深入研究,找到了连接池的配置,并掌握这一知识,为以后的开发部署工作,扫除了障碍。

虽然源码里面还有很多看不懂,但是读书百遍其义自见,还是要多读(我是懒蛋,遇到问题,再去解决,下一个问题是什么呢?)。

 

转载于:https://www.cnblogs.com/jackadam/p/8727409.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_30772105/article/details/98882352

智能推荐

C# 之 WPF 统计图表开发方案_c# wpflivechart-程序员宅基地

文章浏览阅读9.2k次,点赞8次,收藏72次。C# 之 WPF 统计图表开发文档一、前言二、环境配置1、开发环境2、加载 LiveCharts 库3、添加必须的头文件三、基础图形1、柱状图一、前言本项目的统计图使用LiveCharts 控件集成。LiveCharts, 官网:https://lvcharts.net 是一款简单,灵活,交互式和强大的 DOTNET数据可视化图表控件,内置多种统计图表,可满足本项目的需求。二、环境配置..._c# wpflivechart

DBeaver 快捷键大全_dbeaver快捷键-程序员宅基地

文章浏览阅读5k次,点赞7次,收藏23次。有些快捷键未经验证,如有问题望不吝指正!ctrl + enter 执行sqlctrl + \ 执行sql,保留之前窗口结果ctrl + alt + ↑ 向上复制一行ctrl + alt + ↓ 向下复制一行ctrl + shift + F 对sql语句进行格式化,对于很长的sql语句很有用ctrl + d 删除当前行alt + ↑ 向上选定一条sql语句alt + ↓ 向下选定一条sql语句ctrl + / 行注释ctrl + shift+ / 块注释ctrl + f 查找、替换_dbeaver快捷键

【基础】WebView浏览器组件-程序员宅基地

文章浏览阅读5.4k次。ad_webview

CentOS上面安装Oracle 11GR2_oracle11.2.0需下载什么版本的export-程序员宅基地

文章浏览阅读1.7k次。正常图形化界面安装安装X Windowyum groupinstall "X Window System"yum install unzip.x86_64 vim java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64安装依赖软件包yum install binutils compat-libstdc++-33 elfutils-_oracle11.2.0需下载什么版本的export

无中继的DHCP配置-ZTE中兴路由器_中兴1800路由器dhcp-程序员宅基地

文章浏览阅读4.6k次。无中继的DHCP配置DHCP的作用: 动态配置IP地址,整个配置过程自动实现,终端无需设置 所有配置信息统一管理,不仅能够分配IP地址,还可以配置其他信息DHCP的优点: 提高网络配置效率,减少配置工作量,较少IP冲突的可能性DHCP server: 集中存放配置信息,响应客户端的请求与之交互并完成主机配置信息的分..._中兴1800路由器dhcp

计算机网络原理 谢希仁(第8版)第四章习题答案_计算机网络第八版谢希仁课后答案-程序员宅基地

文章浏览阅读9.5w次,点赞368次,收藏1.8k次。第四章网络层习题答案_计算机网络第八版谢希仁课后答案

随便推点

解决jQuery UI API - 图标(Icons)_jquery ui icons.png-程序员宅基地

文章浏览阅读3.9k次。首先 在网上下载jQuery UI APIhttps://jqueryui.com/download/我用的jQuery1.7.1在静态页面上加载<link rel="stylesheet" type="text/css" href="/static/css/jquery-ui.css"/>在jquery-ui.css 中 把图片的地址 用_jquery ui icons.png

Linux安装完成后添加新网卡_linux添加网卡-程序员宅基地

文章浏览阅读4.1k次,点赞2次,收藏19次。在安装完linux后,在系统里添加了一块网卡后,在/etc/sysconfig/network-scripts/目录下没有相应的配置文件ifcfg-eth1。在这种情况下,linux不会主动去添加配置文件ifcfg-eth1的。如果需要使用这块网卡,有二种方法:方法一:使用命令临时指派一个IP给这块新添加的网卡ifconfig eth1 192.168.0.1 netmask 255.255.255.0 up方法二:1.手工添加ifcfg-eth1这个配置文件,然后重启网络。先复制一份ifcf_linux添加网卡

ASP.NET;存储过程_asp.net更新存储过程-程序员宅基地

文章浏览阅读567次。摘要 存储过程的调用在B/S系统中用的很多。传统的调用方法不仅速度慢,而且代码会随着存储过程的增多不断膨胀,难以维护。新的方法在一定程度上解决了这些问题。   关键词 ASP.NET;存储过程     在使用.NET的过程中,数据库访问是一个很重要的部分,特别是在B/S系统的构建过程中,数据库操作几乎成为了一个必不可少的操作。调用存储过程实现数据库操作使很多程序员使用的方法,而且大多数的程序员都是_asp.net更新存储过程

获取apk证书MD5值的方法_certmd5-程序员宅基地

文章浏览阅读2.1w次,点赞4次,收藏10次。1.先把apk解压2.在META_INF目录下找到xxx.RSA文件3.确保安装了jdk并且正确配置了环境变量4.cmd中执行keytool -printcert -file xxxx.RSA目录运行效果如下:这样在查找签名文件冲突的时候就可以查看不同的apk对应的签名文件是否一样。_certmd5

Nodejs最佳实践,空闲的时候翻翻_node 获取空闲进程-程序员宅基地

文章浏览阅读522次。今天看到一篇很不错的文章,虽然很长,但对于Nodejs开发人员来说很有必要收藏。1. 项目结构实践 1.1 组件式构建你的解决方案TL;DR: 大型项目的最坏的隐患就是维护一个庞大的,含有几百个依赖的代码库 - 当开发人员准备整合新的需求的时候,这样一个庞然大物势必减缓了开发效率。反之,把您的代码拆分成组件,每一个组件有它自己的文件夹和代码库,并且确保每一个组件小而简单。查看正确的项目结构的例子请访问下面的 ‘更多’ 链接。否则: 当编写新需求的开发人员逐步意识到他所做改变的影响,并担心会破坏_node 获取空闲进程

微信小程序-JavaScript 3DES对称加密算法加密使用_js 3des-程序员宅基地

文章浏览阅读8.1k次,点赞6次,收藏22次。一、前言:1. 最近又被领导叫去谈话,公司最近有个二维码模块项目要开发,要求使用微信小程序,说是方面和快捷,不用安装手机APP。o(╥﹏╥)o真是无语,老子在公司的职位是Windwos 开发,现在他们竟然为了省钱,叫我去做微信小程序,碍于今年疫情严重,没有办法,只能重新拾起微信小程序。2. 因公司做的产品为门禁读卡设备,所以一般数据安全性有要求,并且与13.56MhHZ ISO14443A..._js 3des