各位用户为了找寻关于基于SQLAlchemy实现操作MySQL并执行原生sql语句的资料费劲了很多周折。这里教程网为您整理了关于基于SQLAlchemy实现操作MySQL并执行原生sql语句的相关资料,仅供查阅,以下为您介绍关于基于SQLAlchemy实现操作MySQL并执行原生sql语句的详细内容

场景应用

老大我让爬取内部网站获取数据,插入到新建的表中,并每天进行爬取更新数据(后面做了定时任务)。然后根据该表统计每日的新增数量/更新数量进行制图制表,向上级汇报。

思路构建

选用sqlalchemy+mysqlconnector,连接数据库,创建表,对指定表进行CRUD

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 from sqlalchemy import exists, Column, Integer, String, ForeignKey, DateTime, Text, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from conf.parseConfig import parseConf   # 从配置文件中获取数据库信息 host = parseConf.get_conf('MySQLInfo', 'host') port = parseConf.get_conf('MySQLInfo', 'port') dbname = parseConf.get_conf('MySQLInfo', 'dbname') usernm = parseConf.get_conf('MySQLInfo', 'usernm') passwd = parseConf.get_conf('MySQLInfo', 'passwd') # 连接数据库 engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm, passwd, host, port, dbname) # 创建的数据库引擎 engine = create_engine(engine_str, encoding='utf-8')   #创建session类型 DBSession = sessionmaker(bind=engine) # 创建session对象,进行增删改查: session = DBSession()   # 实例化官宣模型 - Base 就是 ORM 模型 Base = declarative_base()     # 创建服务单表 继承Base基类 class ServiceOrder(Base):   __tablename__ = 'serviceOrderTable'   serviceOrderId = Column(String(32), primary_key=True, comment='服务单ID')   serviceDesc = Column(String(512), comment='服务说明')   transferTimes = Column(String(32), comment='转派次数')   # 创建更新时间,对数据的更新进行记录   updateTime = Column(DateTime, server_default=func.now(), onupdate=func.now())     def init_db():   Base.metadata.create_all(engine)     def drop_db():   Base.metadata.drop_all(engine)     if __name__ == "__main__":   # 每次执行时 会判断表的存在性 对于数据库中不存在的表进行创建 已存在的表则可以直接进行增删改查   init_db()     ### 首先讲一下使用sqlalchemy执行原生的sql语句###   # 方式一:   res = session.execute('select * from ServiceOrder') # res是获取的对象   all_res_list = res.fetchall() # all_res_list具体的结果 是列表   print(all_res_list ) # 结果: [('数据提取',), ('非数据提取',)]   # 方式二:   conn = engine.connect()   res = conn.execute('select * from ServiceOrder')   all_res_list = res.fetchall()     ### 使用创建好的session对象进行增删改查 ###   # 插入单条数据   # 创建新service0rder对象   new_serviceorder = ServiceOrder(serviceOrderId='001', serviceDesc='ack', transferTimes='9')   # 添加到session   session.add(new_serviceorder)   # 提交即保存到数据库   session.commit()     # 插入多条数据   serviceorder_list = [ServiceOrder(serviceOrderId='002', serviceDesc='好的', transferTimes='9'),ServiceOrder(serviceOrderId='003', serviceDesc='起床', transferTimes='9')]   session.add_all(serviceorder_list)   session.commit()   # session.close()     # 查询   # 查询是否存在 结果是布尔值   it_exists = session.query(     exists().where(ServiceOrder.serviceOrderId == '002')   ).scalar()   # 创建Query查询,filter是where条件   # 调用one() first()返回唯一行,如果调用all()则已列表的形式返回所有行:   server_order = session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == '003').first()   print(server_order.serviceDesc)   serciceorders = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == '好的').all()     # 改 更新数据   # 数据更新,将值为Mack的serviceDesc修改为Danny   update_obj = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').update({"serviceDesc": "Danny"})   # 或则   update_objp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').first()   update_objp.serviceDesc = 'Danny'   session.commit()     # 删除   update_objk = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').delete()   # 或则   update_objkp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').one()   update_objkp.delete()   session.commit()   session.close()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.cnblogs.com/We612/p/12242227.html