各位用户为了找寻关于MySQL单表千万级数据处理的思路分享的资料费劲了很多周折。这里教程网为您整理了关于MySQL单表千万级数据处理的思路分享的相关资料,仅供查阅,以下为您介绍关于MySQL单表千万级数据处理的思路分享的详细内容
项目背景
在处理过程中,今天上午需要更新A字段,下午爬虫组完成了规格书或图片的爬取又需要更新图片和规格书字段,由于单表千万级深度翻页会导致处理速度越来越慢。
? 1select
a,b,c
from
db.tb limit 10000 offset 9000000
但是时间是有限的,是否有更好的方法去解决这种问题呢?
改进思路
是否有可以不需要深度翻页也可以进行数据更新的凭据? 是的,利用自增id列
观察数据特征
此单表有自增id列且为主键,根据索引列查询数据和更新数据是最理想的途径。
? 1 2select
a,b, c
from
db.tb
where
id=9999999;
update
db.tb
set
a=x
where
id=9999999;
多进程处理
每个进程处理一定id范围内的数据,这样既避免的深度翻页又可以同时多进程处理数据。 提高数据查询速度的同时也提高了数据处理速度。 下面是我编写的任务分配函数,供参考:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20def
mission_handler(all_missions, worker_mission_size):
"""
根据总任务数和每个worker的任务数计算出任务列表, 任务列表元素为(任务开始id, 任务结束id)。
例: 总任务数100个,每个worker的任务数40, 那么任务列表为:[(1, 40), (41, 80), (81, 100)]
:param all_missions: 总任务数
:param worker_mission_size: 每个worker的最大任务数
:return: [(start_id, end_id), (start_id, end_id), ...]
"""
worker_mission_ids
=
[]
current_id
=
0
while
current_id <
=
all_missions:
start_id
=
all_missions
if
current_id
+
1
>
=
all_missions
else
current_id
+
1
end_id
=
all_missions
if
current_id
+
worker_mission_size >
=
all_missions
else
current_id
+
worker_mission_size
if
start_id
=
=
end_id:
if
worker_mission_ids[
-
1
][
1
]
=
=
start_id:
break
worker_mission_ids.append((start_id, end_id))
current_id
+
=
worker_mission_size
return
worker_mission_ids
假设单表id最大值为100, 然后我们希望每个进程处理20个id,那么任务列表将为:
? 1 2>>> mission_handler(
100
,
40
)
[(
1
,
40
), (
41
,
80
), (
81
,
100
)]
那么, 进程1将只需要处理id between 1 to 40的数据; 进程2将只需要处理id between 41 to 80的数据; 进程3将只需要处理id between 81 to 100的数据。
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19from
concurrent.futures
import
ProcessPoolExecutor
def
main():
# 自增id最大值
max_id
=
30000000
# 单worker处理数据量
worker_mission_size
=
1000000
# 使用多进程进行处理
missions
=
mission_handler(max_id, worker_mission_size)
workers
=
[]
executor
=
ProcessPoolExecutor()
for
idx, mission
in
enumerate
(missions):
start_id, end_id
=
mission
workers.append(executor.submit(data_handler, start_id, end_id, idx))
def
data_handler(start_id, end_id, worker_id):
pass
思路总结
避免深度翻页进而使用自增id进行查询数据和数据 使用多进程处理数据
数据处理技巧
记录处理成功与处理失败的数据id,以便后续跟进处理
? 1 2# 用另外一张表记录处理状态
insert
into
db.tb_handle_status(row_id, success)
values
(999, 0);
循环体内进行异常捕获,避免程序异常退出
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26def
data_handler(start_id, end_id, worker_id):
# 数据连接
conn, cursor
=
mysql()
current_id
=
start_id
try
:
while
current_id <
=
end_id:
try
:
# TODO 数据处理代码
pass
except
Exception as e:
# TODO 记录处理结果
# 数据移动到下一条
current_id
+
=
1
continue
else
:
# 无异常,继续处理下一条数据
current_id
+
=
1
except
Exception as e:
return
'worker_id({}): result({})'
.
format
(worker_id,
False
)
finally
:
# 数据库资源释放
cursor.close()
conn.close()
return
'worker_id({}): result({})'
.
format
(worker_id,
True
)
更新数据库数据尽量使用批量提交
? 1 2 3 4 5 6 7 8sql
=
"""update db.tb set a=%s, b=%s where id=%s"""
values
=
[
(
'a_value'
,
'b_value'
,
9999
),
(
'a_value'
,
'b_value'
,
9998
),
...
]
# 批量提交,减少网络io以及锁获取频率
cursor.executemany(sql, values)
以上就是MySQL单表千万级数据处理的思路分享的详细内容,更多关于MySQL单表千万级数据处理的资料请关注其它相关文章!
原文链接:https://juejin.cn/post/6969527649995587615