各位用户为了找寻关于利用Python实现网络测试的脚本分享的资料费劲了很多周折。这里教程网为您整理了关于利用Python实现网络测试的脚本分享的相关资料,仅供查阅,以下为您介绍关于利用Python实现网络测试的脚本分享的详细内容
前言
最近同学让我帮忙写一个测试网络的工具。由于工作上的事情,断断续续地拖了很久才给出一个相对完整的版本。其实,我Python用的比较少,所以基本都是边查资料边写程序。
程序的主要逻辑如下:
读取一个excel文件中的ip列表,然后使用多线程调用ping统计每个ip的网络参数,最后把结果输出到excel文件中。
代码如下所示:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190#! /usr/bin/env python
# -*- coding: UTF-8 -*-
# File: pingtest_test.py
# Date: 2008-09-28
# Author: Michael Field
# Modified By:intheworld
# Date: 2017-4-17
import
sys
import
os
import
getopt
import
commands
import
subprocess
import
re
import
time
import
threading
import
xlrd
import
xlwt
TEST
=
[
'220.181.57.217'
,
'166.111.8.28'
,
'202.114.0.242'
,
'202.117.0.20'
,
'202.112.26.34'
,
'202.203.128.33'
,
'202.115.64.33'
,
'202.201.48.2'
,
'202.114.0.242'
,
'202.116.160.33'
,
'202.202.128.33'
,
]
RESULT
=
{}
def
usage():
print
"USEAGE:"
print
"t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]"
%
sys.argv[
0
]
print
"t TEST为简单测试的IP列表"
print
"t-t times 测试次数;默认为1000;"
print
"t-c concurrent number 并行线程数目:默认为10"
print
"t-h|-?, 帮助信息"
print
"t 输出为当前目录文件ping_result.txt 和 ping_result.xls"
print
"for example:"
print
"t./ping_test.py -n TEST -t 1 -c 10"
def
div_list(ls,n):
if
not
isinstance
(ls,
list
)
or
not
isinstance
(n,
int
):
return
[]
ls_len
=
len
(ls)
print
'ls length = %s'
%
ls_len
if
n<
=
0
or
0
=
=
ls_len:
return
[]
if
n > ls_len:
return
[]
elif
n
=
=
ls_len:
return
[[i]
for
i
in
ls]
else
:
j
=
ls_len
/
n
k
=
ls_len
%
n
### j,j,j,...(前面有n-1个j),j+k
#步长j,次数n-1
ls_return
=
[]
for
i
in
xrange
(
0
,(n
-
1
)
*
j,j):
ls_return.append(ls[i:i
+
j])
#算上末尾的j+k
ls_return.append(ls[(n
-
1
)
*
j:])
return
ls_return
def
pin(IP):
try
:
xpin
=
subprocess.check_output(
"ping -n 1 -w 100 %s"
%
IP, shell
=
True
)
except
Exception:
xpin
=
'empty'
ms
=
'=[0-9]+ms'
.decode(
"utf8"
)
print
"%s"
%
ms
print
"%s"
%
xpin
mstime
=
re.search(ms,xpin)
if
not
mstime:
MS
=
'timeout'
return
MS
else
:
MS
=
mstime.group().split(
'='
)[
1
]
return
MS.strip(
'ms'
)
def
count(total_count,I):
global
RESULT
nowsecond
=
int
(time.time())
nums
=
0
oknums
=
0
timeout
=
0
lostpacket
=
0.0
total_ms
=
0.0
avgms
=
0.0
maxms
=
-
1
while
nums < total_count:
nums
+
=
1
MS
=
pin(I)
print
'pin output = %s'
%
MS
if
MS
=
=
'timeout'
:
timeout
+
=
1
lostpacket
=
timeout
*
100.0
/
nums
else
:
oknums
+
=
1
total_ms
=
total_ms
+
float
(MS)
if
oknums
=
=
0
:
oknums
=
1
maxms
=
float
(MS)
avgms
=
total_ms
/
oknums
else
:
avgms
=
total_ms
/
oknums
maxms
=
max
(maxms,
float
(MS))
RESULT[I]
=
(I, avgms, maxms, lostpacket)
def
thread_func(t, ipList):
if
not
isinstance
(ipList,
list
):
return
else
:
for
ip
in
ipList:
count(t, ip)
def
readIpsInFile(excelName):
data
=
xlrd.open_workbook(excelName)
table
=
data.sheets()[
0
]
nrows
=
table.nrows
print
'nrows %s'
%
nrows
ips
=
[]
for
i
in
range
(nrows):
ips.append(table.cell_value(i,
0
))
print
table.cell_value(i,
0
)
return
ips
if
__name__
=
=
'__main__'
:
file
=
'ping_result.txt'
times
=
10
network
=
''
thread_num
=
10
args
=
sys.argv[
1
:]
try
:
(opts, getopts)
=
getopt.getopt(args,
'n:t:c:h?'
)
except
:
print
"nInvalid command line option detected."
usage()
sys.exit(
1
)
for
opt, arg
in
opts:
if
opt
in
(
'-n'
):
network
=
arg
if
opt
in
(
'-h'
,
'-?'
):
usage()
sys.exit(
0
)
if
opt
in
(
'-t'
):
times
=
int
(arg)
if
opt
in
(
'-c'
):
thread_num
=
int
(arg)
f
=
open
(
file
,
'w'
)
workbook
=
xlwt.Workbook()
sheet1
=
workbook.add_sheet(
"sheet1"
, cell_overwrite_ok
=
True
)
if
not
isinstance
(times,
int
):
usage()
sys.exit(
0
)
if
network
not
in
[
'TEST'
]
and
not
os.path.exists(os.path.join(os.path.dirname(__file__), network)):
print
"The network is wrong or excel file does not exist. please check it."
usage()
sys.exit(
0
)
else
:
if
network
=
=
'TEST'
:
ips
=
TEST
else
:
ips
=
readIpsInFile(network)
print
'Starting...'
threads
=
[]
nest_list
=
div_list(ips, thread_num)
loops
=
range
(
len
(nest_list))
print
'Total %s Threads is working...'
%
len
(nest_list)
for
ipList
in
nest_list:
t
=
threading.Thread(target
=
thread_func,args
=
(times,ipList))
threads.append(t)
for
i
in
loops:
threads[i].start()
for
i
in
loops:
threads[i].join()
it
=
0
for
line
in
RESULT:
value
=
RESULT[line]
sheet1.write(it,
0
, line)
sheet1.write(it,
1
,
str
(
'%.2f'
%
value[
1
]))
sheet1.write(it,
2
,
str
(
'%.2f'
%
value[
2
]))
sheet1.write(it,
3
,
str
(
'%.2f'
%
value[
3
]))
it
+
=
1
f.write(line
+
't'
+
str
(
'%.2f'
%
value[
1
])
+
't'
+
str
(
'%.2f'
%
value[
2
])
+
't'
+
str
(
'%.2f'
%
value[
3
])
+
'n'
)
f.close()
workbook.save(
'ping_result.xls'
)
print
'Work Done. please check result %s and ping_result.xls.'
%
file
这段代码参照了别人的实现,虽然不是特别复杂,这里还是简单解释一下。
excel读写使用了xlrd和xlwt,基本都是使用了一些简单的api。 使用了threading实现多线程并发,和POSIX标准接口非常相似。thread_func是线程的处理函数,它的输入包含了一个ip的List,所以在函数内部通过循环处理各个ip。 此外,Python的commands在Windows下并不兼容,所以使用了subprocess模块。到目前为止,我对Python里面字符集的理解还不到位,所以正则表达式匹配的代码并不够强壮,不过目前勉强可以工作,以后有必要再改咯!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。
原文链接:http://intheworld.win/2017/04/27/使用python进行网络测试/