Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | import struct |
| 2 | import binascii |
| 3 | from Crypto.Cipher import AES |
| 4 | from Crypto.Hash import CMAC |
| 5 | |
| 6 | # Chiavi edat/npdrm |
| 7 | EDAT_HASH_KEY = bytes([0xEF, 0xFE, 0x5B, 0xD1, 0x65, 0x2E, 0xEB, 0xC1, 0x19, 0x18, 0xCF, 0x7C, 0x04, 0xD4, 0xF0, 0x11]) |
| 8 | EDAT_IV = bytes(16) |
| 9 | EDAT_KEY = bytes([0xBE, 0x95, 0x9C, 0xA8, 0x30, 0x8D, 0xEF, 0xA2, 0xE5, 0xE1, 0x80, 0xC6, 0x37, 0x12, 0xA9, 0xAE]) |
| 10 | NPDRM_OMAC_KEY2 = bytes([0x6B, 0xA5, 0x29, 0x76, 0xEF, 0xDA, 0x16, 0xEF, 0x3C, 0x33, 0x9F, 0xB2, 0x97, 0x1E, 0x25, 0x6B]) |
| 11 | NPDRM_OMAC_KEY3 = bytes([0x9B, 0x51, 0x5F, 0xEA, 0xCF, 0x75, 0x06, 0x49, 0x81, 0xAA, 0x60, 0x4D, 0x91, 0xA5, 0x4E, 0x97]) |
| 12 | SDAT_KEY = bytes([0x0D, 0x65, 0x5E, 0xF8, 0xE6, 0x74, 0xA9, 0x8A, 0xB8, 0x50, 0x5C, 0xFA, 0x7D, 0x01, 0x29, 0x33]) |
| 13 | |
| 14 | FLAG_COMPRESSED = 1 |
| 15 | FLAG_0x02 = 0x2 |
| 16 | FLAG_0x10 = 0x10 |
| 17 | FLAG_0x20 = 0x20 |
| 18 | FLAG_KEYENCRYPTED = 0x8 |
| 19 | FLAG_DEBUG = 0x80000000 |
| 20 | FLAG_SDAT = 0x1000000 |
| 21 | |
| 22 | STATUS_OK = 0 |
| 23 | STATUS_ERROR_HEADERCHECK = -4 |
| 24 | STATUS_ERROR_DECRYPTING = -5 |
| 25 | STATUS_ERROR_MISSINGKEY = -3 |
| 26 | STATUS_ERROR_INCORRECT_FLAGS = -6 |
| 27 | STATUS_ERROR_INCORRECT_VERSION = -7 |
| 28 | STATUS_ERROR_HASHDEVKLIC = -2 |
| 29 | STATUS_ERROR_HASHTITLEIDNAME = -1 |
| 30 | |
| 31 | HEADER_MAX_BLOCKSIZE = 0x3C00 |
| 32 | |
| 33 | |
| 34 | def be32(b, off=0): |
| 35 | return struct.unpack_from(">I", b, off)[0] |
| 36 | |
| 37 | |
| 38 | def be64(b, off=0): |
| 39 | return struct.unpack_from(">Q", b, off)[0] |
| 40 | |
| 41 | |
| 42 | def xor_bytes(a: bytes, b: bytes) -> bytes: |
| 43 | return bytes(x ^ y for x, y in zip(a, b)) |
| 44 | |
| 45 | |
| 46 | def arraycopy(src, src_off, dst, dst_off, length): |
| 47 | dst[dst_off:dst_off+length] = src[src_off:src_off+length] |
| 48 | |
| 49 | |
| 50 | def cmac_aes(key: bytes, data: bytes) -> bytes: |
| 51 | c = CMAC.new(key, ciphermod=AES) |
| 52 | c.update(data) |
| 53 | return c.digest() |
| 54 | |
| 55 | |
| 56 | class NPD: |
| 57 | def __init__(self, raw: bytes): |
| 58 | self.magic = raw[:4] |
| 59 | self.version = be32(raw, 4) |
| 60 | self.license = be32(raw, 8) |
| 61 | self.type = be32(raw, 12) |
| 62 | self.content_id = raw[0x10:0x40] |
| 63 | self.digest = raw[0x40:0x50] |
| 64 | self.title_hash = raw[0x50:0x60] |
| 65 | self.dev_hash = raw[0x60:0x70] |
| 66 | self.unknown3 = be64(raw, 0x70) |
| 67 | self.unknown4 = be64(raw, 0x78) |
| 68 | if not self.validate(): |
| 69 | raise ValueError("Invalid NPD header") |
| 70 | |
| 71 | def validate(self): |
| 72 | return self.magic == b"NPD\x00" and self.unknown3 == 0 and self.unknown4 == 0 |
| 73 | |
| 74 | @classmethod |
| 75 | def parse(cls, buf: bytes): |
| 76 | if len(buf) < 0x80: |
| 77 | raise ValueError("NPD header too short") |
| 78 | return cls(buf[:0x80]) |
| 79 | |
| 80 | |
| 81 | class EDATData: |
| 82 | def __init__(self, flags: int, block_size: int, file_len: int): |
| 83 | self.flags = flags |
| 84 | self.block_size = block_size |
| 85 | self.file_len = file_len |
| 86 | |
| 87 | @classmethod |
| 88 | def parse(cls, meta: bytes): |
| 89 | if len(meta) < 0x10: |
| 90 | raise ValueError("Metadata too short") |
| 91 | return cls(be32(meta, 0), be32(meta, 4), be64(meta, 8)) |
| 92 | |
| 93 | |
| 94 | def decrypt_metadata_section(meta: bytes) -> bytes: |
| 95 | m = meta + b"\x00" * max(0, 0x20 - len(meta)) |
| 96 | return bytes([ |
| 97 | (m[12] ^ m[8] ^ m[16]), |
| 98 | (m[13] ^ m[9] ^ m[17]), |
| 99 | (m[14] ^ m[10] ^ m[18]), |
| 100 | (m[15] ^ m[11] ^ m[19]), |
| 101 | (m[4] ^ m[8] ^ m[20]), |
| 102 | (m[5] ^ m[9] ^ m[21]), |
| 103 | (m[6] ^ m[10] ^ m[22]), |
| 104 | (m[7] ^ m[11] ^ m[23]), |
| 105 | (m[12] ^ m[0] ^ m[24]), |
| 106 | (m[13] ^ m[1] ^ m[25]), |
| 107 | (m[14] ^ m[2] ^ m[26]), |
| 108 | (m[15] ^ m[3] ^ m[27]), |
| 109 | (m[4] ^ m[0] ^ m[28]), |
| 110 | (m[5] ^ m[1] ^ m[29]), |
| 111 | (m[6] ^ m[2] ^ m[30]), |
| 112 | (m[7] ^ m[3] ^ m[31]) |
| 113 | ]) |
| 114 | |
| 115 | |
| 116 | def aes_ecb_encrypt(key: bytes, data: bytes) -> bytes: |
| 117 | return AES.new(key, AES.MODE_ECB).encrypt(data) |
| 118 | |
| 119 | |
| 120 | def aes_cbc_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes: |
| 121 | return AES.new(key, AES.MODE_CBC, iv).decrypt(data) |
| 122 | |
| 123 | |
| 124 | def check_npd_hash1(filename: str, npd_raw: bytes) -> bool: |
| 125 | src = filename.encode("ascii", errors="ignore") |
| 126 | dest = npd_raw[0x10:0x40] + src |
| 127 | cm = cmac_aes(NPDRM_OMAC_KEY3, dest) |
| 128 | return cm == npd_raw[0x50:0x60] |
| 129 | |
| 130 | |
| 131 | def check_npd_hash2(devklic: bytes, npd_raw: bytes) -> bool: |
| 132 | output = xor_bytes(devklic, NPDRM_OMAC_KEY2) |
| 133 | cm = cmac_aes(output, npd_raw[:0x60]) |
| 134 | return cm == npd_raw[0x60:0x70] |
| 135 | |
| 136 | |
| 137 | def create_npd_hash1(filename: str, npd_raw: bytearray): |
| 138 | src = filename.encode("ascii", errors="ignore") |
| 139 | dest = npd_raw[0x10:0x40] + src |
| 140 | cm = cmac_aes(NPDRM_OMAC_KEY3, dest) |
| 141 | arraycopy(cm, 0, npd_raw, 0x50, 0x10) |
| 142 | return cm |
| 143 | |
| 144 | |
| 145 | def create_npd_hash2(devklic: bytes, npd_raw: bytearray): |
| 146 | output = xor_bytes(devklic, NPDRM_OMAC_KEY2) |
| 147 | cm = cmac_aes(output, npd_raw[:0x60]) |
| 148 | arraycopy(cm, 0, npd_raw, 0x60, 0x10) |
| 149 | return cm |
| 150 | |
| 151 | |
| 152 | def calculate_block_key(blk: int, npd: NPD) -> bytes: |
| 153 | src = npd.dev_hash if npd.version > 1 else bytes(0x10) |
| 154 | dest = bytearray(0x10) |
| 155 | arraycopy(src, 0, dest, 0, 12) |
| 156 | dest[12] = (blk >> 24) & 0xFF |
| 157 | dest[13] = (blk >> 16) & 0xFF |
| 158 | dest[14] = (blk >> 8) & 0xFF |
| 159 | dest[15] = blk & 0xFF |
| 160 | return bytes(dest) |
| 161 | |
| 162 | |
| 163 | def decrypt_file(in_path: str, out_path: str, dev_klic: bytes = None, key_from_rif: bytes = None): |
| 164 | with open(in_path, "rb") as f: |
| 165 | npd_raw = f.read(0x80) |
| 166 | meta_hdr = f.read(0x10) |
| 167 | f.seek(0x80) |
| 168 | npd = NPD.parse(npd_raw) |
| 169 | flags = be32(meta_hdr, 0) |
| 170 | data = EDATData.parse(meta_hdr) |
| 171 | # validate hashes |
| 172 | filename = in_path.split("\\")[-1] |
| 173 | if not check_npd_hash1(filename, npd_raw): |
| 174 | return STATUS_ERROR_HASHTITLEIDNAME |
| 175 | if dev_klic and not check_npd_hash2(dev_klic, npd_raw): |
| 176 | return STATUS_ERROR_HASHDEVKLIC |
| 177 | # get key |
| 178 | if flags & FLAG_SDAT: |
| 179 | rif_key = xor_bytes(npd.dev_hash, SDAT_KEY) |
| 180 | elif npd.license == 3: |
| 181 | rif_key = dev_klic |
| 182 | else: |
| 183 | rif_key = key_from_rif |
| 184 | if not rif_key: |
| 185 | return STATUS_ERROR_MISSINGKEY |
| 186 | |
| 187 | out = open(out_path, "wb") |
| 188 | # blocchi |
| 189 | num_blocks = (data.file_len + data.block_size - 1) // data.block_size |
| 190 | meta_stride = 0x20 if (flags & FLAG_COMPRESSED or flags & FLAG_0x20) else 0x10 |
| 191 | header_size = 0x100 |
| 192 | for i in range(num_blocks): |
| 193 | f.seek(0x100 + i * meta_stride) |
| 194 | dest = bytearray(0x10) |
| 195 | extra = 0 |
| 196 | if flags & FLAG_COMPRESSED: |
| 197 | meta = f.read(0x20) |
| 198 | decm = decrypt_metadata_section(meta) |
| 199 | data_off = be64(decm, 0) |
| 200 | data_len = be32(decm, 8) |
| 201 | extra = be32(decm, 12) |
| 202 | arraycopy(meta, 0, dest, 0, 0x10) |
| 203 | elif flags & FLAG_0x20: |
| 204 | meta = f.read(0x20) |
| 205 | for j in range(0x10): |
| 206 | dest[j] = meta[j] ^ meta[j+0x10] |
| 207 | data_off = header_size + i * data.block_size + num_blocks * meta_stride |
| 208 | data_len = data.block_size if i != num_blocks -1 else data.file_len % data.block_size or data.block_size |
| 209 | else: |
| 210 | meta = f.read(0x10) |
| 211 | dest = meta |
| 212 | data_off = header_size + i * data.block_size + num_blocks * meta_stride |
| 213 | data_len = data.block_size if i != num_blocks -1 else data.file_len % data.block_size or data.block_size |
| 214 | |
| 215 | f.seek(data_off) |
| 216 | padded_len = (data_len + 15) & ~15 |
| 217 | enc_block = f.read(padded_len) |
| 218 | block_key = calculate_block_key(i, npd) |
| 219 | ek = aes_ecb_encrypt(rif_key, block_key) |
| 220 | if flags & FLAG_0x10: |
| 221 | iv = aes_ecb_encrypt(rif_key, ek) |
| 222 | else: |
| 223 | iv = ek |
| 224 | cipher = AES.new(ek, AES.MODE_CBC, iv) |
| 225 | dec = cipher.decrypt(enc_block) |
| 226 | out.write(dec[:data_len]) |
| 227 | out.close() |
| 228 | return STATUS_OK |
| 229 | |
| 230 | |
| 231 | def encrypt_file(in_path: str, out_path: str, dev_klic: bytes, key_from_rif: bytes, content_id: bytes, flags: bytes, version: bytes, type_byte: bytes): |
| 232 | # Minimal port: builds NPD header and encrypts like EDAT.cs encryptFile |
| 233 | with open(in_path, "rb") as fin, open(out_path, "wb") as fout: |
| 234 | data_len = fin.seek(0, 2) or fin.tell() |
| 235 | fin.seek(0) |
| 236 | # Build NPD header |
| 237 | npd = bytearray(0x80) |
| 238 | npd[0:4] = b"NPD\x00" |
| 239 | npd[4:8] = version.rjust(4, b"\x00") |
| 240 | npd[8:12] = b"\x00\x00\x00\x03" # license devklic by default |
| 241 | npd[12:16] = type_byte.rjust(4, b"\x00") |
| 242 | arraycopy(content_id, 0, npd, 0x10, 0x30) |
| 243 | arraycopy(dev_klic, 0, npd, 0x60, 0x10) |
| 244 | create_npd_hash1(out_path.split("\\")[-1], npd) |
| 245 | create_npd_hash2(dev_klic, npd) |
| 246 | fout.write(npd) |
| 247 | # meta block |
| 248 | meta = bytearray(0x10) |
| 249 | arraycopy(flags, 0, meta, 0, 4) |
| 250 | meta[4:8] = struct.pack(">I", 0x4000) |
| 251 | meta[8:16] = struct.pack(">Q", data_len) |
| 252 | fout.write(meta) |
| 253 | fout.write(b"\x00\x00\x00\x00") |
| 254 | pad = bytearray(4); pad[2]=0x40 |
| 255 | fout.write(pad) |
| 256 | fout.write(b"\x00"*8) |
| 257 | while fout.tell() < 0x100: |
| 258 | fout.write(b"\x00") |
| 259 | block_size = 0x4000 |
| 260 | num = (data_len + block_size -1)//block_size |
| 261 | # encrypt blocks |
| 262 | hashes = bytearray(num*0x10) |
| 263 | payload = bytearray() |
| 264 | for i in range(num): |
| 265 | blk_offset = i*block_size |
| 266 | fin.seek(blk_offset) |
| 267 | chunk = fin.read(block_size) |
| 268 | if i == num-1 and len(chunk)<block_size: |
| 269 | chunk += b"\x00"*((block_size-len(chunk)+15)&~15) |
| 270 | elif len(chunk)%16: |
| 271 | chunk += b"\x00"*(16-(len(chunk)%16)) |
| 272 | block_key = calculate_block_key(i, NPD.parse(bytes(npd))) |
| 273 | ek = aes_ecb_encrypt(dev_klic, block_key) |
| 274 | iv = ek |
| 275 | cipher = AES.new(ek, AES.MODE_CBC, iv) |
| 276 | enc = cipher.encrypt(chunk) |
| 277 | payload += enc |
| 278 | # hash for metadata |
| 279 | cm = CMAC.new(dev_klic, ciphermod=AES) |
| 280 | cm.update(chunk) |
| 281 | arraycopy(cm.digest(),0,hashes, i*0x10, 0x10) |
| 282 | fout.write(hashes) |
| 283 | fout.write(payload) |
| 284 | footer = bytes.fromhex("4D6164652062792052325220546F6F6C") |
| 285 | fout.write(footer) |
| 286 | return STATUS_OK |
| 287 |