各位用户为了找寻关于基于sqlalchemy对mysql实现增删改查操作的资料费劲了很多周折。这里教程网为您整理了关于基于sqlalchemy对mysql实现增删改查操作的相关资料,仅供查阅,以下为您介绍关于基于sqlalchemy对mysql实现增删改查操作的详细内容

需求场景:

老大让我利用爬虫爬取的数据写到或更新到mysql数据库中,百度了两种方法

1 是使用pymysql连接mysql,通过操作原生的sql语句进行增删改查数据;

2 是使用sqlalchemy连接mysql,通过ORM模型建表并操作数据库,不需要写原生的sql语句,相对简单些;

以下就是本次使用sqlalchemy的经验之谈。

实现流程:连接数据库》通过模型类创建表》建立会话》执行创建表语句》通过会话进行增删改查

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 from sqlalchemy import exists, Column, Integer, String, ForeignKey, exists from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker   # 创建的数据库引擎 engine = create_engine("mysql+pymysql://user:pwd@ip/数据库名?charset=utf8")   #创建session类型 DBSession = sessionmaker(bind=engine)   # 实例化官宣模型 - Base 就是 ORM 模型 Base = declarative_base()     # 创建服务单表 class ServiceOrder(Base):   __tablename__ = 'serviceOrderTable'   id = Column(Integer, primary_key=True, autoincrement=True)   serviceOrderId = Column(String(32), nullable=False, index=True, comment='服务单ID')   serviceDesc = Column(String(268), comment='服务说明')   oneLevelName = Column(String(32), comment='C类别')   twoLevelName = Column(String(32), comment='T子类')   threeLevelName = Column(String(32), comment='I项目')   fourLevelName = Column(String(32), comment='S子项')   transferTimes = Column(String(32), comment='转派次数')   overDueStatus = Column(String(32), comment='过期状态')   serviceTimeLimit = Column(String(32), comment='服务时限')   serTimeLimitTypeName = Column(String(16), comment='时限类型'   # 一对多:   # serviceWorkOrder = relationship("ServiceWorkOrder", backref="serviceorder")     # 多对一:多个服务工单可以属于服务单 class ServiceWorkOrder(Base):   __tablename__ = 'serviceWorkOrderTable'   id = Column(Integer, primary_key=True, autoincrement=True)   serviceWorkOrderId = Column(String(32), nullable=False, index=True, comment='服务工单ID')   workOrderName = Column(String(268), comment='工单名称')   fromId = Column(String(32), comment='服务单ID')   createUserSectionName = Column(String(32), comment='创建人室')   createUserName = Column(String(32), comment='创建人')   handlerName = Column(String(32), comment='处理人')   statusName = Column(String(32), comment='工单状态')   createTime = Column(String(32), comment='创建时间')   # “多”的一方的book表是通过外键关联到user表的:   # serviceOrder_id = Column(Integer, ForeignKey('serviceOrderTable.id'))   # 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库 def init_db():   Base.metadata.create_all(engine)   def drop_db():   Base.metadata.drop_all(engine)   def insert_update():   # all_needed_data_lists 是需要插入数据库的数据 格式[{key: value, ... }, { }, { }...]   for item in all_needed_data_lists:     ServiceOrderRow = ServiceOrder(serviceOrderId=item['serviceOrderId'],                     serviceDesc=item['serviceDesc'],                     oneLevelName=item['oneLevelName'],                     twoLevelName=item['twoLevelName'],                     threeLevelName=item['threeLevelName'],                     fourLevelName=item['fourLevelName'],                     transferTimes=item['transferTimes'],                     overDueStatus=item['overDueStatus'],                     serviceTimeLimit=item['serviceTimeLimit'],                     serTimeLimitTypeName=item['serTimeLimitTypeName'],                     )     try:       # 利用exists判断目标对象是否存在,返回True或Faults       it_exists = session.query(           exists().where(ServiceOrder.serviceOrderId == item['serviceOrderId'] )         ).scalar()     except Exception as e:       self.log.error(e)       break     try:       # 如果不存在,进行新增;存在的话就更新现存的数据       if not it_exists:         session.add(ServiceOrderRow)       else:         session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == item['serviceOrderId'])           .update(item)     except Exception as e:       self.log.error(e)       break   try:     session.commit()     self.log.info('数据更新成功!')   except:     session.rollback()     self.log.info('数据更新失败!')   if __name__ == "__main__":   # 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库   init_db()   # 创建session对象,进行增删改查:   session = DBSession()   # 利用session 增 改数据 记得提交   insert_update()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://www.cnblogs.com/We612/p/12105135.html