-->
当前位置:首页 > DayDayUp > 正文内容

python-flask + mysql实现带登录功能的简易文件服务器

Luz2年前 (2023-05-30)DayDayUp2722

需求

登录功能,文件上传功能,上传文件管理(删除)功能,免登录文件下载功能


实现

sql部分

需要两个sql表,分别完成用户信息存放、文件信息存放。因为做演示使用,密码就使用明文存放了。

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(255) NOT NULL,
    password VARCHAR(255) NOT NULL);
CREATE TABLE files (
    id INT AUTO_INCREMENT PRIMARY KEY,
    filename VARCHAR(255) NOT NULL,
    filepath VARCHAR(255) NOT NULL);

user表存放用户名、密码,files表存放文件名与存储路径。


python部分

mysql连接与cursor获取

# MySQL连接配置
db = mysql.connector.connect(
    host='127.0.0.1',
    user='filemanager',
    password='password',
    database='filemanager'
)

# 创建一个游标对象
cursor = db.cursor()


登录功能

@app.route('/', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        # 在数据库中验证用户名和密码
        cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))
        user = cursor.fetchone()
        if user:
            # 如果用户验证成功,将其存储在会话中
            session['username'] = user[1]
            flash('登录成功!')
            return redirect(url_for('dashboard'))
        else:
            flash('用户名或密码错误!')
    db.close()
    return render_template('login.html')

面板与上传文件功能

@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():

    if 'username' in session:
        if request.method == 'POST':
            if 'file' not in request.files:
                flash('未选择文件!')
                return redirect(request.url)
            file = request.files['file']
            if file.filename == '':
                flash('未选择文件!')
                return redirect(request.url)
            if file and allowed_file(file.filename):
                filename = secure_filename(file.filename)
                file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
                # 将文件链接存储到数据库中
                cursor.execute("INSERT INTO files (filename, filepath) VALUES (%s, %s)",
                               (filename, os.path.join(app.config['UPLOAD_FOLDER'], filename)))
                db.commit()
                flash('文件上传成功!')

        # 从数据库中获取上传的文件列表
        cursor.execute("SELECT * FROM files")
        files = cursor.fetchall()
        db.close()
        return render_template('dashboard.html', username=session['username'], files=files)
    else:
        db.close()
        return redirect(url_for('login'))

删除文件功能

@app.route('/delete_file/<int:file_id>', methods=['GET'])
def delete_file(file_id):
    if 'username' in session:
        # 从数据库中删除文件记录
        cursor.execute("SELECT * FROM files WHERE id=%s", (file_id,))
        file = cursor.fetchone()
        if file:
            cursor.execute("DELETE FROM files WHERE id=%s", (file_id,))
            db.commit()
		# 删除文件
            file_path = file[2]
            os.remove(file_path)
            flash('文件删除成功!')
        else:
            flash('文件不存在!')
        db.close()
        return redirect(url_for('dashboard'))

文件下载功能

@app.route('/uploads/<filename>', methods=['GET'])
def download(filename):
    if 1:
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        if os.path.exists(file_path):
            return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
        else:
            flash('文件不存在!')
            return redirect(url_for('dashboard'))

完整代码

run.py

from flask import Flask, render_template, request, redirect, url_for, flash,send_from_directory
from flask import session
from werkzeug.utils import secure_filename
import os
import mysql.connector

app = Flask(__name__)
app.secret_key = 'secret_key'  # 替换为一个随机的密钥

# MySQL连接配置
db = mysql.connector.connect(
    host='127.0.0.1',
    user='filemanager',
    password='l2',
    database='filemanager'
)

# 创建一个游标对象
cursor = db.cursor()

# 设置允许上传的文件类型
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif','zip','rar'}

# 配置文件上传路径
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['SECRET_KEY'] = 'secret_key'
if not os.path.exists(app.config['UPLOAD_FOLDER']):
    os.makedirs(app.config['UPLOAD_FOLDER'])
    
def allowed_file(filename):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS


@app.route('/', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        # 在数据库中验证用户名和密码
        cursor.execute("SELECT * FROM users WHERE username=%s AND password=%s", (username, password))
        user = cursor.fetchone()
        if user:
            # 如果用户验证成功,将其存储在会话中
            session['username'] = user[1]
            flash('登录成功!')
            return redirect(url_for('dashboard'))
        else:
            flash('用户名或密码错误!')
    db.close()
    return render_template('login.html')


@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():

    if 'username' in session:
        if request.method == 'POST':
            if 'file' not in request.files:
                flash('未选择文件!')
                return redirect(request.url)
            file = request.files['file']
            if file.filename == '':
                flash('未选择文件!')
                return redirect(request.url)
            if file and allowed_file(file.filename):
                filename = secure_filename(file.filename)
                file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
                # 将文件链接存储到数据库中
                cursor.execute("INSERT INTO files (filename, filepath) VALUES (%s, %s)",
                               (filename, os.path.join(app.config['UPLOAD_FOLDER'], filename)))
                db.commit()
                flash('文件上传成功!')

        # 从数据库中获取上传的文件列表
        cursor.execute("SELECT * FROM files")
        files = cursor.fetchall()
        db.close()
        return render_template('dashboard.html', username=session['username'], files=files)
    else:
        db.close()
        return redirect(url_for('login'))


@app.route('/delete_file/<int:file_id>', methods=['GET'])
def delete_file(file_id):
    if 'username' in session:
        # 从数据库中删除文件记录
        cursor.execute("SELECT * FROM files WHERE id=%s", (file_id,))
        file = cursor.fetchone()
        if file:
            cursor.execute("DELETE FROM files WHERE id=%s", (file_id,))
            db.commit()
		# 删除文件
            file_path = file[2]
            os.remove(file_path)
            flash('文件删除成功!')
        else:
            flash('文件不存在!')
        db.close()
        return redirect(url_for('dashboard'))
        
@app.route('/uploads/<filename>', methods=['GET'])
def download(filename):
    if 1:
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        if os.path.exists(file_path):
            return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
        else:
            flash('文件不存在!')
            return redirect(url_for('dashboard'))

if __name__ == '__main__':
    app.run(host='0.0.0.0',port=55555,debug=False)


dashboard.html

<!DOCTYPE html>
<html>
<head>
    <title>Dashboard</title>
</head>
<body>
    <h2>Welcome, {{ username }}!</h2>
    <h3>允许上传的文件类型:txt,pdf,png,jpg,jpeg,gif,zip,rar</h3>
    <form method="POST" action="{{ url_for('dashboard') }}" enctype="multipart/form-data">
        <div>
            <input type="file" name="file">
            <input type="submit" value="Upload">
        </div>
    </form>
    <hr>
    {% with messages = get_flashed_messages() %}
    {% if messages %}
    <ul class="messages">
        {% for message in messages %}
        <li>{{ message }}</li>
        {% endfor %}
    </ul>
    {% endif %}
    {% endwith %}
    <h3>Uploaded Files:</h3>
    <ul>
        {% for file in files %}
        <li>
            <a href="{{ file[2] }}">{{ file[1] }}</a>
            <a href="{{ url_for('delete_file', file_id=file[0]) }}">Delete</a>
        </li>
        {% endfor %}
    </ul>
<a href="{{ url_for('login') }}">Logout</a>
</body>
</html>


login.html


<!DOCTYPE html>
<html>
<head>
    <title>Login</title>
</head>
<body>
    {% with messages = get_flashed_messages() %}
    {% if messages %}
    <ul class="messages">
        {% for message in messages %}
        <li>{{ message }}</li>
        {% endfor %}
    </ul>
    {% endif %}
    {% endwith %}
    <h2>Login</h2>
    <form method="POST" action="{{ url_for('login') }}">
        <div>
            <label>Username:</label>
            <input type="text" name="username">
        </div>
        <div>
            <label>Password:</label>
            <input type="password" name="password">
        </div>
        <div>
            <input type="submit" value="Login">
        </div>
    </form>
</body>
</html>



运行截图

image.png

image.png

image.png


发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。