各位用户为了找寻关于基于SQLAlchemy实现操作MySQL并执行原生sql语句的资料费劲了很多周折。这里教程网为您整理了关于基于SQLAlchemy实现操作MySQL并执行原生sql语句的相关资料,仅供查阅,以下为您介绍关于基于SQLAlchemy实现操作MySQL并执行原生sql语句的详细内容
场景应用
老大我让爬取内部网站获取数据,插入到新建的表中,并每天进行爬取更新数据(后面做了定时任务)。然后根据该表统计每日的新增数量/更新数量进行制图制表,向上级汇报。
思路构建
选用sqlalchemy+mysqlconnector,连接数据库,创建表,对指定表进行CRUD
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99from
sqlalchemy
import
exists, Column, Integer, String, ForeignKey, DateTime, Text, func
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
create_engine
from
sqlalchemy.orm
import
sessionmaker
from
conf.parseConfig
import
parseConf
# 从配置文件中获取数据库信息
host
=
parseConf.get_conf(
'MySQLInfo'
,
'host'
)
port
=
parseConf.get_conf(
'MySQLInfo'
,
'port'
)
dbname
=
parseConf.get_conf(
'MySQLInfo'
,
'dbname'
)
usernm
=
parseConf.get_conf(
'MySQLInfo'
,
'usernm'
)
passwd
=
parseConf.get_conf(
'MySQLInfo'
,
'passwd'
)
# 连接数据库
engine_str
=
"mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}"
.
format
(usernm, passwd, host, port, dbname)
# 创建的数据库引擎
engine
=
create_engine(engine_str, encoding
=
'utf-8'
)
#创建session类型
DBSession
=
sessionmaker(bind
=
engine)
# 创建session对象,进行增删改查:
session
=
DBSession()
# 实例化官宣模型 - Base 就是 ORM 模型
Base
=
declarative_base()
# 创建服务单表 继承Base基类
class
ServiceOrder(Base):
__tablename__
=
'serviceOrderTable'
serviceOrderId
=
Column(String(
32
), primary_key
=
True
, comment
=
'服务单ID'
)
serviceDesc
=
Column(String(
512
), comment
=
'服务说明'
)
transferTimes
=
Column(String(
32
), comment
=
'转派次数'
)
# 创建更新时间,对数据的更新进行记录
updateTime
=
Column(DateTime, server_default
=
func.now(), onupdate
=
func.now())
def
init_db():
Base.metadata.create_all(engine)
def
drop_db():
Base.metadata.drop_all(engine)
if
__name__
=
=
"__main__"
:
# 每次执行时 会判断表的存在性 对于数据库中不存在的表进行创建 已存在的表则可以直接进行增删改查
init_db()
### 首先讲一下使用sqlalchemy执行原生的sql语句###
# 方式一:
res
=
session.execute(
'select * from ServiceOrder'
)
# res是获取的对象
all_res_list
=
res.fetchall()
# all_res_list具体的结果 是列表
print
(all_res_list )
# 结果: [('数据提取',), ('非数据提取',)]
# 方式二:
conn
=
engine.connect()
res
=
conn.execute(
'select * from ServiceOrder'
)
all_res_list
=
res.fetchall()
### 使用创建好的session对象进行增删改查 ###
# 插入单条数据
# 创建新service0rder对象
new_serviceorder
=
ServiceOrder(serviceOrderId
=
'001'
, serviceDesc
=
'ack'
, transferTimes
=
'9'
)
# 添加到session
session.add(new_serviceorder)
# 提交即保存到数据库
session.commit()
# 插入多条数据
serviceorder_list
=
[ServiceOrder(serviceOrderId
=
'002'
, serviceDesc
=
'好的'
, transferTimes
=
'9'
),ServiceOrder(serviceOrderId
=
'003'
, serviceDesc
=
'起床'
, transferTimes
=
'9'
)]
session.add_all(serviceorder_list)
session.commit()
# session.close()
# 查询
# 查询是否存在 结果是布尔值
it_exists
=
session.query(
exists().where(ServiceOrder.serviceOrderId
=
=
'002'
)
).scalar()
# 创建Query查询,filter是where条件
# 调用one() first()返回唯一行,如果调用all()则已列表的形式返回所有行:
server_order
=
session.query(ServiceOrder).
filter
(ServiceOrder.serviceOrderId
=
=
'003'
).first()
print
(server_order.serviceDesc)
serciceorders
=
session.query(ServiceOrder).
filter
(ServiceOrder.serviceDesc
=
=
'好的'
).
all
()
# 改 更新数据
# 数据更新,将值为Mack的serviceDesc修改为Danny
update_obj
=
session.query(ServiceOrder).
filter
(ServiceOrder.serviceDesc
=
=
'Mack'
).update({
"serviceDesc"
:
"Danny"
})
# 或则
update_objp
=
session.query(ServiceOrder).
filter
(ServiceOrder.serviceDesc
=
=
'Mack'
).first()
update_objp.serviceDesc
=
'Danny'
session.commit()
# 删除
update_objk
=
session.query(ServiceOrder).
filter
(ServiceOrder.serviceDesc
=
=
'Mack'
).delete()
# 或则
update_objkp
=
session.query(ServiceOrder).
filter
(ServiceOrder.serviceDesc
=
=
'Mack'
).one()
update_objkp.delete()
session.commit()
session.close()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/We612/p/12242227.html