各位用户为了找寻关于python3实现ftp服务功能(服务端 For Linux)的资料费劲了很多周折。这里教程网为您整理了关于python3实现ftp服务功能(服务端 For Linux)的相关资料,仅供查阅,以下为您介绍关于python3实现ftp服务功能(服务端 For Linux)的详细内容

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls pwd cd put rm get mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条 server main 代码:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 # Author by Andy # _*_ coding:utf-8 _*_ import os, sys, json, hashlib, socketserver, time   base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(base_dir) from conf import userdb_set class Ftp_server(socketserver.BaseRequestHandler):  user_home_dir = ''  def auth(self, *args):   '''验证用户名及密码'''   cmd_dic = args[0]   username = cmd_dic["username"]   password = cmd_dic["password"]   f = open(userdb_set.userdb_set(), 'r')   user_info = json.load(f)   if username in user_info.keys():    if password == user_info[username]:     self.request.send('0'.encode())     os.chdir('/home/%s' % username)     self.user_home_dir = os.popen('pwd').read().strip()     data = "%s login successed" % username     self.loging(data)    else:     self.request.send('1'.encode())     data = "%s login failed" % username     self.loging(data)     f.close   else:    self.request.send('1'.encode())    data = "%s login failed" % username    self.loging(data)    f.close    ##########################################    def get(self, *args):   '''给客户端传输文件'''   request_code = {    '0': 'file is ready to get',    '1': 'file not found!'   }   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   filename = cmd_dic["filename"]   if os.path.isfile(filename):    self.request.send('0'.encode('utf-8')) # 确认文件存在    self.request.recv(1024)    self.request.send(str(os.stat(filename).st_size).encode('utf-8'))    self.request.recv(1024)    m = hashlib.md5()    f = open(filename, 'rb')    for line in f:     m.update(line)     self.request.send(line)    self.request.send(m.hexdigest().encode('utf-8'))    print('From server:Md5 value has been sended!')    f.close()   else:    self.request.send('1'.encode('utf-8'))    ###########################################    def cd(self, *args):   '''执行cd命令'''   user_current_dir = os.popen('pwd').read().strip()   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   path = cmd_dic['path']   if path.startswith('/'):    if self.user_home_dir in path:     os.chdir(path)     new_dir = os.popen('pwd').read()     user_current_dir = new_dir     self.request.send('Change dir successfully!'.encode("utf-8"))     data = 'Change dir successfully!'     self.loging(data)    elif os.path.exists(path):     self.request.send('Permission Denied!'.encode("utf-8"))     data = 'Permission Denied!'     self.loging(data)    else:     self.request.send('Directory not found!'.encode("utf-8"))     data = 'Directory not found!'     self.loging(data)   elif os.path.exists(path):    os.chdir(path)    new_dir = os.popen('pwd').read().strip()    if self.user_home_dir in new_dir:     self.request.send('Change dir successfully!'.encode("utf-8"))     user_current_dir = new_dir     data = 'Change dir successfully!'     self.loging(data)    else:     os.chdir(user_current_dir)     self.request.send('Permission Denied!'.encode("utf-8"))     data = 'Permission Denied!'     self.loging(data)   else:    self.request.send('Directory not found!'.encode("utf-8"))    data = 'Directory not found!'    self.loging(data)    ###########################################    def rm(self, *args):   request_code = {    '0': 'file exist,and Please confirm whether to rm',    '1': 'file not found!'   }   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   filename = cmd_dic['filename']   if os.path.exists(filename):    self.request.send('0'.encode("utf-8")) # 确认文件存在    client_response = self.request.recv(1024).decode()    if client_response == '0':     os.popen('rm -rf %s' % filename)     self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))     self.loging('File %s has been deleted!' % filename)    else:     self.request.send(('File %s not deleted!' % filename).encode("utf-8"))     self.loging('File %s not deleted!' % filename)   else:    self.request.send('1'.encode("utf-8"))    ########################################    def pwd(self, *args):   '''执行pwd命令'''   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   server_response = os.popen('pwd').read().strip().encode("utf-8")   self.request.send(server_response)    #############################################  def ls(self, *args):   '''执行ls命名'''   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   path = cmd_dic['path']   cmd = 'ls -l %s' % path   server_response = os.popen(cmd).read().encode("utf-8")   self.request.send(server_response)    ############################################  def put(self, *args):   '''接收客户端文件'''   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   filename = cmd_dic["filename"]   filesize = cmd_dic["size"]   if os.path.isfile(filename):    f = open(filename + '.new', 'wb')   else:    f = open(filename, 'wb')   request_code = {    '200': 'Ready to recceive data!',    '210': 'Not ready to received data!'   }   self.request.send('200'.encode())   receive_size = 0   while True:    if receive_size < filesize:     data = self.request.recv(1024)     f.write(data)     receive_size += len(data)    else:     data = "File %s has been uploaded successfully!" % filename     self.loging(data)     print(data)     break       ################################################    def mkdir(self, *args):   request_code = {    '0': 'Directory has been made!',    '1': 'Directory is aleady exist!'   }   cmd_dic = args[0]   self.loging(json.dumps(cmd_dic))   dir_name = cmd_dic['dir_name']   if os.path.exists(dir_name):    self.request.send('1'.encode("utf-8"))   else:    os.popen('mkdir %s' % dir_name)    self.request.send('0'.encode("utf-8"))      #############################################     def loging(self, data):   '''日志记录'''   localtime = time.asctime(time.localtime(time.time()))   log_file = '/root/ftp/ftpserver/log/server.log'   with open(log_file, 'a', encoding='utf-8') as f:    f.write('%s-->' % localtime + data + 'n')    ##############################################    def handle(self):   # print("您本次访问使用的IP为:%s" %self.client_address[0])   # localtime = time.asctime( time.localtime(time.time()))   # print(localtime)     while True:    try:     self.data = self.request.recv(1024).decode() #     # print(self.data)     cmd_dic = json.loads(self.data)     action = cmd_dic["action"]     # print("用户请求%s"%action)     if hasattr(self, action):      func = getattr(self, action)      func(cmd_dic)    except Exception as e:     self.loging(str(e))     break   def run():  HOST, PORT = '0.0.0.0', 6969  print("The server is started,and listenning at port 6969")  server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)  server.serve_forever() if __name__ == '__main__':  run()

设置用户口令代码:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #Author by Andy #_*_ coding:utf-8 _*_ import os,json,hashlib,sys   base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) userdb_file = base_dir+"datauserdb" # print(userdb_file) def userdb_set():  if os.path.isfile(userdb_file):   # print(userdb_file)   return userdb_file  else:   print('请先为您的服务器创建用户!')   user_data = {}   dict={}   Exit_flags = True   while Exit_flags:    username = input("Please input username:")    if username != 'exit':     password = input("Please input passwod:")     if password != 'exit':       user_data.update({username:password})       m = hashlib.md5()       # m.update('hello')       # print(m.hexdigest())       for i in user_data:        # print(i,user_data[i])        m.update(user_data[i].encode())        dict.update({i:m.hexdigest()})     else:      break    else:     break   f = open(userdb_file,'w')   json.dump(dict,f)   f.close()  return userdb_file

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。