python-flask + mysql实现带登录功能的简易文件服务器
需求
登录功能,文件上传功能,上传文件管理(删除)功能,免登录文件下载功能
实现
sql部分
需要两个sql表,分别完成用户信息存放、文件信息存放。因为做演示使用,密码就使用明文存放了。
CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(255) NOT NULL, password VARCHAR(255) NOT NULL); CREATE TABLE files ( id INT AUTO_INCREMENT PRIMARY KEY, filename VARCHAR(255) NOT NULL, filepath VARCHAR(255) NOT NULL);
user表存放用户名、密码,files表存放文件名与存储路径。
python部分
mysql连接与cursor获取
# MySQL连接配置 db = mysql.connector.connect( host='127.0.0.1', user='filemanager', password='password', database='filemanager' ) # 创建一个游标对象 cursor = db.cursor()
登录功能
@app.route('/', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] # 在数据库中验证用户名和密码 cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password)) user = cursor.fetchone() if user: # 如果用户验证成功,将其存储在会话中 session['username'] = user[1] flash('登录成功!') return redirect(url_for('dashboard')) else: flash('用户名或密码错误!') db.close() return render_template('login.html')
面板与上传文件功能
@app.route('/dashboard', methods=['GET', 'POST']) def dashboard(): if 'username' in session: if request.method == 'POST': if 'file' not in request.files: flash('未选择文件!') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('未选择文件!') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) # 将文件链接存储到数据库中 cursor.execute("INSERT INTO files (filename, filepath) VALUES (%s, %s)", (filename, os.path.join(app.config['UPLOAD_FOLDER'], filename))) db.commit() flash('文件上传成功!') # 从数据库中获取上传的文件列表 cursor.execute("SELECT * FROM files") files = cursor.fetchall() db.close() return render_template('dashboard.html', username=session['username'], files=files) else: db.close() return redirect(url_for('login'))
删除文件功能
@app.route('/delete_file/<int:file_id>', methods=['GET']) def delete_file(file_id): if 'username' in session: # 从数据库中删除文件记录 cursor.execute("SELECT * FROM files WHERE id=%s", (file_id,)) file = cursor.fetchone() if file: cursor.execute("DELETE FROM files WHERE id=%s", (file_id,)) db.commit() # 删除文件 file_path = file[2] os.remove(file_path) flash('文件删除成功!') else: flash('文件不存在!') db.close() return redirect(url_for('dashboard'))
文件下载功能
@app.route('/uploads/<filename>', methods=['GET']) def download(filename): if 1: file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True) else: flash('文件不存在!') return redirect(url_for('dashboard'))
完整代码
run.py
from flask import Flask, render_template, request, redirect, url_for, flash,send_from_directory from flask import session from werkzeug.utils import secure_filename import os import mysql.connector app = Flask(__name__) app.secret_key = 'secret_key' # 替换为一个随机的密钥 # MySQL连接配置 db = mysql.connector.connect( host='127.0.0.1', user='filemanager', password='l2', database='filemanager' ) # 创建一个游标对象 cursor = db.cursor() # 设置允许上传的文件类型 ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif','zip','rar'} # 配置文件上传路径 UPLOAD_FOLDER = 'uploads' app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['SECRET_KEY'] = 'secret_key' if not os.path.exists(app.config['UPLOAD_FOLDER']): os.makedirs(app.config['UPLOAD_FOLDER']) def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route('/', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] # 在数据库中验证用户名和密码 cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password)) user = cursor.fetchone() if user: # 如果用户验证成功,将其存储在会话中 session['username'] = user[1] flash('登录成功!') return redirect(url_for('dashboard')) else: flash('用户名或密码错误!') db.close() return render_template('login.html') @app.route('/dashboard', methods=['GET', 'POST']) def dashboard(): if 'username' in session: if request.method == 'POST': if 'file' not in request.files: flash('未选择文件!') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('未选择文件!') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) # 将文件链接存储到数据库中 cursor.execute("INSERT INTO files (filename, filepath) VALUES (%s, %s)", (filename, os.path.join(app.config['UPLOAD_FOLDER'], filename))) db.commit() flash('文件上传成功!') # 从数据库中获取上传的文件列表 cursor.execute("SELECT * FROM files") files = cursor.fetchall() db.close() return render_template('dashboard.html', username=session['username'], files=files) else: db.close() return redirect(url_for('login')) @app.route('/delete_file/<int:file_id>', methods=['GET']) def delete_file(file_id): if 'username' in session: # 从数据库中删除文件记录 cursor.execute("SELECT * FROM files WHERE id=%s", (file_id,)) file = cursor.fetchone() if file: cursor.execute("DELETE FROM files WHERE id=%s", (file_id,)) db.commit() # 删除文件 file_path = file[2] os.remove(file_path) flash('文件删除成功!') else: flash('文件不存在!') db.close() return redirect(url_for('dashboard')) @app.route('/uploads/<filename>', methods=['GET']) def download(filename): if 1: file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True) else: flash('文件不存在!') return redirect(url_for('dashboard')) if __name__ == '__main__': app.run(host='0.0.0.0',port=55555,debug=False)
dashboard.html
<!DOCTYPE html> <html> <head> <title>Dashboard</title> </head> <body> <h2>Welcome, {{ username }}!</h2> <h3>允许上传的文件类型:txt,pdf,png,jpg,jpeg,gif,zip,rar</h3> <form method="POST" action="{{ url_for('dashboard') }}" enctype="multipart/form-data"> <div> <input type="file" name="file"> <input type="submit" value="Upload"> </div> </form> <hr> {% with messages = get_flashed_messages() %} {% if messages %} <ul class="messages"> {% for message in messages %} <li>{{ message }}</li> {% endfor %} </ul> {% endif %} {% endwith %} <h3>Uploaded Files:</h3> <ul> {% for file in files %} <li> <a href="{{ file[2] }}">{{ file[1] }}</a> <a href="{{ url_for('delete_file', file_id=file[0]) }}">Delete</a> </li> {% endfor %} </ul> <a href="{{ url_for('login') }}">Logout</a> </body> </html>
login.html
<!DOCTYPE html> <html> <head> <title>Login</title> </head> <body> {% with messages = get_flashed_messages() %} {% if messages %} <ul class="messages"> {% for message in messages %} <li>{{ message }}</li> {% endfor %} </ul> {% endif %} {% endwith %} <h2>Login</h2> <form method="POST" action="{{ url_for('login') }}"> <div> <label>Username:</label> <input type="text" name="username"> </div> <div> <label>Password:</label> <input type="password" name="password"> </div> <div> <input type="submit" value="Login"> </div> </form> </body> </html>