各位用户为了找寻关于python3实现ftp服务功能(服务端 For Linux)的资料费劲了很多周折。这里教程网为您整理了关于python3实现ftp服务功能(服务端 For Linux)的相关资料,仅供查阅,以下为您介绍关于python3实现ftp服务功能(服务端 For Linux)的详细内容
本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下
功能介绍:
可执行的命令:
ls pwd cd put rm get mkdir
1、用户加密认证
2、允许多用户同时登陆
3、每个用户有自己的家目录,且只可以访问自己的家目录
4、运行在自己家目录下随意切换目录
5、允许上传下载文件,且文件一致
6、传输过程中显示进度条 server main 代码:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221# Author by Andy
# _*_ coding:utf-8 _*_
import
os, sys, json, hashlib, socketserver, time
base_dir
=
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from
conf
import
userdb_set
class
Ftp_server(socketserver.BaseRequestHandler):
user_home_dir
=
''
def
auth(
self
,
*
args):
'''验证用户名及密码'''
cmd_dic
=
args[
0
]
username
=
cmd_dic[
"username"
]
password
=
cmd_dic[
"password"
]
f
=
open
(userdb_set.userdb_set(),
'r'
)
user_info
=
json.load(f)
if
username
in
user_info.keys():
if
password
=
=
user_info[username]:
self
.request.send(
'0'
.encode())
os.chdir(
'/home/%s'
%
username)
self
.user_home_dir
=
os.popen(
'pwd'
).read().strip()
data
=
"%s login successed"
%
username
self
.loging(data)
else
:
self
.request.send(
'1'
.encode())
data
=
"%s login failed"
%
username
self
.loging(data)
f.close
else
:
self
.request.send(
'1'
.encode())
data
=
"%s login failed"
%
username
self
.loging(data)
f.close
##########################################
def
get(
self
,
*
args):
'''给客户端传输文件'''
request_code
=
{
'0'
:
'file is ready to get'
,
'1'
:
'file not found!'
}
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
filename
=
cmd_dic[
"filename"
]
if
os.path.isfile(filename):
self
.request.send(
'0'
.encode(
'utf-8'
))
# 确认文件存在
self
.request.recv(
1024
)
self
.request.send(
str
(os.stat(filename).st_size).encode(
'utf-8'
))
self
.request.recv(
1024
)
m
=
hashlib.md5()
f
=
open
(filename,
'rb'
)
for
line
in
f:
m.update(line)
self
.request.send(line)
self
.request.send(m.hexdigest().encode(
'utf-8'
))
print
(
'From server:Md5 value has been sended!'
)
f.close()
else
:
self
.request.send(
'1'
.encode(
'utf-8'
))
###########################################
def
cd(
self
,
*
args):
'''执行cd命令'''
user_current_dir
=
os.popen(
'pwd'
).read().strip()
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
path
=
cmd_dic[
'path'
]
if
path.startswith(
'/'
):
if
self
.user_home_dir
in
path:
os.chdir(path)
new_dir
=
os.popen(
'pwd'
).read()
user_current_dir
=
new_dir
self
.request.send(
'Change dir successfully!'
.encode(
"utf-8"
))
data
=
'Change dir successfully!'
self
.loging(data)
elif
os.path.exists(path):
self
.request.send(
'Permission Denied!'
.encode(
"utf-8"
))
data
=
'Permission Denied!'
self
.loging(data)
else
:
self
.request.send(
'Directory not found!'
.encode(
"utf-8"
))
data
=
'Directory not found!'
self
.loging(data)
elif
os.path.exists(path):
os.chdir(path)
new_dir
=
os.popen(
'pwd'
).read().strip()
if
self
.user_home_dir
in
new_dir:
self
.request.send(
'Change dir successfully!'
.encode(
"utf-8"
))
user_current_dir
=
new_dir
data
=
'Change dir successfully!'
self
.loging(data)
else
:
os.chdir(user_current_dir)
self
.request.send(
'Permission Denied!'
.encode(
"utf-8"
))
data
=
'Permission Denied!'
self
.loging(data)
else
:
self
.request.send(
'Directory not found!'
.encode(
"utf-8"
))
data
=
'Directory not found!'
self
.loging(data)
###########################################
def
rm(
self
,
*
args):
request_code
=
{
'0'
:
'file exist,and Please confirm whether to rm'
,
'1'
:
'file not found!'
}
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
filename
=
cmd_dic[
'filename'
]
if
os.path.exists(filename):
self
.request.send(
'0'
.encode(
"utf-8"
))
# 确认文件存在
client_response
=
self
.request.recv(
1024
).decode()
if
client_response
=
=
'0'
:
os.popen(
'rm -rf %s'
%
filename)
self
.request.send((
'File %s has been deleted!'
%
filename).encode(
"utf-8"
))
self
.loging(
'File %s has been deleted!'
%
filename)
else
:
self
.request.send((
'File %s not deleted!'
%
filename).encode(
"utf-8"
))
self
.loging(
'File %s not deleted!'
%
filename)
else
:
self
.request.send(
'1'
.encode(
"utf-8"
))
########################################
def
pwd(
self
,
*
args):
'''执行pwd命令'''
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
server_response
=
os.popen(
'pwd'
).read().strip().encode(
"utf-8"
)
self
.request.send(server_response)
#############################################
def
ls(
self
,
*
args):
'''执行ls命名'''
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
path
=
cmd_dic[
'path'
]
cmd
=
'ls -l %s'
%
path
server_response
=
os.popen(cmd).read().encode(
"utf-8"
)
self
.request.send(server_response)
############################################
def
put(
self
,
*
args):
'''接收客户端文件'''
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
filename
=
cmd_dic[
"filename"
]
filesize
=
cmd_dic[
"size"
]
if
os.path.isfile(filename):
f
=
open
(filename
+
'.new'
,
'wb'
)
else
:
f
=
open
(filename,
'wb'
)
request_code
=
{
'200'
:
'Ready to recceive data!'
,
'210'
:
'Not ready to received data!'
}
self
.request.send(
'200'
.encode())
receive_size
=
0
while
True
:
if
receive_size < filesize:
data
=
self
.request.recv(
1024
)
f.write(data)
receive_size
+
=
len
(data)
else
:
data
=
"File %s has been uploaded successfully!"
%
filename
self
.loging(data)
print
(data)
break
################################################
def
mkdir(
self
,
*
args):
request_code
=
{
'0'
:
'Directory has been made!'
,
'1'
:
'Directory is aleady exist!'
}
cmd_dic
=
args[
0
]
self
.loging(json.dumps(cmd_dic))
dir_name
=
cmd_dic[
'dir_name'
]
if
os.path.exists(dir_name):
self
.request.send(
'1'
.encode(
"utf-8"
))
else
:
os.popen(
'mkdir %s'
%
dir_name)
self
.request.send(
'0'
.encode(
"utf-8"
))
#############################################
def
loging(
self
, data):
'''日志记录'''
localtime
=
time.asctime(time.localtime(time.time()))
log_file
=
'/root/ftp/ftpserver/log/server.log'
with
open
(log_file,
'a'
, encoding
=
'utf-8'
) as f:
f.write(
'%s-->'
%
localtime
+
data
+
'n'
)
##############################################
def
handle(
self
):
# print("您本次访问使用的IP为:%s" %self.client_address[0])
# localtime = time.asctime( time.localtime(time.time()))
# print(localtime)
while
True
:
try
:
self
.data
=
self
.request.recv(
1024
).decode()
#
# print(self.data)
cmd_dic
=
json.loads(
self
.data)
action
=
cmd_dic[
"action"
]
# print("用户请求%s"%action)
if
hasattr
(
self
, action):
func
=
getattr
(
self
, action)
func(cmd_dic)
except
Exception as e:
self
.loging(
str
(e))
break
def
run():
HOST, PORT
=
'0.0.0.0'
,
6969
print
(
"The server is started,and listenning at port 6969"
)
server
=
socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)
server.serve_forever()
if
__name__
=
=
'__main__'
:
run()
设置用户口令代码:
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37#Author by Andy
#_*_ coding:utf-8 _*_
import
os,json,hashlib,sys
base_dir
=
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
userdb_file
=
base_dir
+
"datauserdb"
# print(userdb_file)
def
userdb_set():
if
os.path.isfile(userdb_file):
# print(userdb_file)
return
userdb_file
else
:
print
(
'请先为您的服务器创建用户!'
)
user_data
=
{}
dict
=
{}
Exit_flags
=
True
while
Exit_flags:
username
=
input
(
"Please input username:"
)
if
username !
=
'exit'
:
password
=
input
(
"Please input passwod:"
)
if
password !
=
'exit'
:
user_data.update({username:password})
m
=
hashlib.md5()
# m.update('hello')
# print(m.hexdigest())
for
i
in
user_data:
# print(i,user_data[i])
m.update(user_data[i].encode())
dict
.update({i:m.hexdigest()})
else
:
break
else
:
break
f
=
open
(userdb_file,
'w'
)
json.dump(
dict
,f)
f.close()
return
userdb_file
目录结构:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。