Seregon/PkgToolBox

Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.

Python/57.3 KB/No license
packages/edat.py
PkgToolBox / packages / edat.py
1import struct
2import binascii
3from Crypto.Cipher import AES
4from Crypto.Hash import CMAC
5 
6# Chiavi edat/npdrm
7EDAT_HASH_KEY = bytes([0xEF, 0xFE, 0x5B, 0xD1, 0x65, 0x2E, 0xEB, 0xC1, 0x19, 0x18, 0xCF, 0x7C, 0x04, 0xD4, 0xF0, 0x11])
8EDAT_IV = bytes(16)
9EDAT_KEY = bytes([0xBE, 0x95, 0x9C, 0xA8, 0x30, 0x8D, 0xEF, 0xA2, 0xE5, 0xE1, 0x80, 0xC6, 0x37, 0x12, 0xA9, 0xAE])
10NPDRM_OMAC_KEY2 = bytes([0x6B, 0xA5, 0x29, 0x76, 0xEF, 0xDA, 0x16, 0xEF, 0x3C, 0x33, 0x9F, 0xB2, 0x97, 0x1E, 0x25, 0x6B])
11NPDRM_OMAC_KEY3 = bytes([0x9B, 0x51, 0x5F, 0xEA, 0xCF, 0x75, 0x06, 0x49, 0x81, 0xAA, 0x60, 0x4D, 0x91, 0xA5, 0x4E, 0x97])
12SDAT_KEY = bytes([0x0D, 0x65, 0x5E, 0xF8, 0xE6, 0x74, 0xA9, 0x8A, 0xB8, 0x50, 0x5C, 0xFA, 0x7D, 0x01, 0x29, 0x33])
13 
14FLAG_COMPRESSED = 1
15FLAG_0x02 = 0x2
16FLAG_0x10 = 0x10
17FLAG_0x20 = 0x20
18FLAG_KEYENCRYPTED = 0x8
19FLAG_DEBUG = 0x80000000
20FLAG_SDAT = 0x1000000
21 
22STATUS_OK = 0
23STATUS_ERROR_HEADERCHECK = -4
24STATUS_ERROR_DECRYPTING = -5
25STATUS_ERROR_MISSINGKEY = -3
26STATUS_ERROR_INCORRECT_FLAGS = -6
27STATUS_ERROR_INCORRECT_VERSION = -7
28STATUS_ERROR_HASHDEVKLIC = -2
29STATUS_ERROR_HASHTITLEIDNAME = -1
30 
31HEADER_MAX_BLOCKSIZE = 0x3C00
32 
33 
34def be32(b, off=0):
35 return struct.unpack_from(">I", b, off)[0]
36 
37 
38def be64(b, off=0):
39 return struct.unpack_from(">Q", b, off)[0]
40 
41 
42def xor_bytes(a: bytes, b: bytes) -> bytes:
43 return bytes(x ^ y for x, y in zip(a, b))
44 
45 
46def arraycopy(src, src_off, dst, dst_off, length):
47 dst[dst_off:dst_off+length] = src[src_off:src_off+length]
48 
49 
50def cmac_aes(key: bytes, data: bytes) -> bytes:
51 c = CMAC.new(key, ciphermod=AES)
52 c.update(data)
53 return c.digest()
54 
55 
56class NPD:
57 def __init__(self, raw: bytes):
58 self.magic = raw[:4]
59 self.version = be32(raw, 4)
60 self.license = be32(raw, 8)
61 self.type = be32(raw, 12)
62 self.content_id = raw[0x10:0x40]
63 self.digest = raw[0x40:0x50]
64 self.title_hash = raw[0x50:0x60]
65 self.dev_hash = raw[0x60:0x70]
66 self.unknown3 = be64(raw, 0x70)
67 self.unknown4 = be64(raw, 0x78)
68 if not self.validate():
69 raise ValueError("Invalid NPD header")
70 
71 def validate(self):
72 return self.magic == b"NPD\x00" and self.unknown3 == 0 and self.unknown4 == 0
73 
74 @classmethod
75 def parse(cls, buf: bytes):
76 if len(buf) < 0x80:
77 raise ValueError("NPD header too short")
78 return cls(buf[:0x80])
79 
80 
81class EDATData:
82 def __init__(self, flags: int, block_size: int, file_len: int):
83 self.flags = flags
84 self.block_size = block_size
85 self.file_len = file_len
86 
87 @classmethod
88 def parse(cls, meta: bytes):
89 if len(meta) < 0x10:
90 raise ValueError("Metadata too short")
91 return cls(be32(meta, 0), be32(meta, 4), be64(meta, 8))
92 
93 
94def decrypt_metadata_section(meta: bytes) -> bytes:
95 m = meta + b"\x00" * max(0, 0x20 - len(meta))
96 return bytes([
97 (m[12] ^ m[8] ^ m[16]),
98 (m[13] ^ m[9] ^ m[17]),
99 (m[14] ^ m[10] ^ m[18]),
100 (m[15] ^ m[11] ^ m[19]),
101 (m[4] ^ m[8] ^ m[20]),
102 (m[5] ^ m[9] ^ m[21]),
103 (m[6] ^ m[10] ^ m[22]),
104 (m[7] ^ m[11] ^ m[23]),
105 (m[12] ^ m[0] ^ m[24]),
106 (m[13] ^ m[1] ^ m[25]),
107 (m[14] ^ m[2] ^ m[26]),
108 (m[15] ^ m[3] ^ m[27]),
109 (m[4] ^ m[0] ^ m[28]),
110 (m[5] ^ m[1] ^ m[29]),
111 (m[6] ^ m[2] ^ m[30]),
112 (m[7] ^ m[3] ^ m[31])
113 ])
114 
115 
116def aes_ecb_encrypt(key: bytes, data: bytes) -> bytes:
117 return AES.new(key, AES.MODE_ECB).encrypt(data)
118 
119 
120def aes_cbc_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes:
121 return AES.new(key, AES.MODE_CBC, iv).decrypt(data)
122 
123 
124def check_npd_hash1(filename: str, npd_raw: bytes) -> bool:
125 src = filename.encode("ascii", errors="ignore")
126 dest = npd_raw[0x10:0x40] + src
127 cm = cmac_aes(NPDRM_OMAC_KEY3, dest)
128 return cm == npd_raw[0x50:0x60]
129 
130 
131def check_npd_hash2(devklic: bytes, npd_raw: bytes) -> bool:
132 output = xor_bytes(devklic, NPDRM_OMAC_KEY2)
133 cm = cmac_aes(output, npd_raw[:0x60])
134 return cm == npd_raw[0x60:0x70]
135 
136 
137def create_npd_hash1(filename: str, npd_raw: bytearray):
138 src = filename.encode("ascii", errors="ignore")
139 dest = npd_raw[0x10:0x40] + src
140 cm = cmac_aes(NPDRM_OMAC_KEY3, dest)
141 arraycopy(cm, 0, npd_raw, 0x50, 0x10)
142 return cm
143 
144 
145def create_npd_hash2(devklic: bytes, npd_raw: bytearray):
146 output = xor_bytes(devklic, NPDRM_OMAC_KEY2)
147 cm = cmac_aes(output, npd_raw[:0x60])
148 arraycopy(cm, 0, npd_raw, 0x60, 0x10)
149 return cm
150 
151 
152def calculate_block_key(blk: int, npd: NPD) -> bytes:
153 src = npd.dev_hash if npd.version > 1 else bytes(0x10)
154 dest = bytearray(0x10)
155 arraycopy(src, 0, dest, 0, 12)
156 dest[12] = (blk >> 24) & 0xFF
157 dest[13] = (blk >> 16) & 0xFF
158 dest[14] = (blk >> 8) & 0xFF
159 dest[15] = blk & 0xFF
160 return bytes(dest)
161 
162 
163def decrypt_file(in_path: str, out_path: str, dev_klic: bytes = None, key_from_rif: bytes = None):
164 with open(in_path, "rb") as f:
165 npd_raw = f.read(0x80)
166 meta_hdr = f.read(0x10)
167 f.seek(0x80)
168 npd = NPD.parse(npd_raw)
169 flags = be32(meta_hdr, 0)
170 data = EDATData.parse(meta_hdr)
171 # validate hashes
172 filename = in_path.split("\\")[-1]
173 if not check_npd_hash1(filename, npd_raw):
174 return STATUS_ERROR_HASHTITLEIDNAME
175 if dev_klic and not check_npd_hash2(dev_klic, npd_raw):
176 return STATUS_ERROR_HASHDEVKLIC
177 # get key
178 if flags & FLAG_SDAT:
179 rif_key = xor_bytes(npd.dev_hash, SDAT_KEY)
180 elif npd.license == 3:
181 rif_key = dev_klic
182 else:
183 rif_key = key_from_rif
184 if not rif_key:
185 return STATUS_ERROR_MISSINGKEY
186 
187 out = open(out_path, "wb")
188 # blocchi
189 num_blocks = (data.file_len + data.block_size - 1) // data.block_size
190 meta_stride = 0x20 if (flags & FLAG_COMPRESSED or flags & FLAG_0x20) else 0x10
191 header_size = 0x100
192 for i in range(num_blocks):
193 f.seek(0x100 + i * meta_stride)
194 dest = bytearray(0x10)
195 extra = 0
196 if flags & FLAG_COMPRESSED:
197 meta = f.read(0x20)
198 decm = decrypt_metadata_section(meta)
199 data_off = be64(decm, 0)
200 data_len = be32(decm, 8)
201 extra = be32(decm, 12)
202 arraycopy(meta, 0, dest, 0, 0x10)
203 elif flags & FLAG_0x20:
204 meta = f.read(0x20)
205 for j in range(0x10):
206 dest[j] = meta[j] ^ meta[j+0x10]
207 data_off = header_size + i * data.block_size + num_blocks * meta_stride
208 data_len = data.block_size if i != num_blocks -1 else data.file_len % data.block_size or data.block_size
209 else:
210 meta = f.read(0x10)
211 dest = meta
212 data_off = header_size + i * data.block_size + num_blocks * meta_stride
213 data_len = data.block_size if i != num_blocks -1 else data.file_len % data.block_size or data.block_size
214 
215 f.seek(data_off)
216 padded_len = (data_len + 15) & ~15
217 enc_block = f.read(padded_len)
218 block_key = calculate_block_key(i, npd)
219 ek = aes_ecb_encrypt(rif_key, block_key)
220 if flags & FLAG_0x10:
221 iv = aes_ecb_encrypt(rif_key, ek)
222 else:
223 iv = ek
224 cipher = AES.new(ek, AES.MODE_CBC, iv)
225 dec = cipher.decrypt(enc_block)
226 out.write(dec[:data_len])
227 out.close()
228 return STATUS_OK
229 
230 
231def encrypt_file(in_path: str, out_path: str, dev_klic: bytes, key_from_rif: bytes, content_id: bytes, flags: bytes, version: bytes, type_byte: bytes):
232 # Minimal port: builds NPD header and encrypts like EDAT.cs encryptFile
233 with open(in_path, "rb") as fin, open(out_path, "wb") as fout:
234 data_len = fin.seek(0, 2) or fin.tell()
235 fin.seek(0)
236 # Build NPD header
237 npd = bytearray(0x80)
238 npd[0:4] = b"NPD\x00"
239 npd[4:8] = version.rjust(4, b"\x00")
240 npd[8:12] = b"\x00\x00\x00\x03" # license devklic by default
241 npd[12:16] = type_byte.rjust(4, b"\x00")
242 arraycopy(content_id, 0, npd, 0x10, 0x30)
243 arraycopy(dev_klic, 0, npd, 0x60, 0x10)
244 create_npd_hash1(out_path.split("\\")[-1], npd)
245 create_npd_hash2(dev_klic, npd)
246 fout.write(npd)
247 # meta block
248 meta = bytearray(0x10)
249 arraycopy(flags, 0, meta, 0, 4)
250 meta[4:8] = struct.pack(">I", 0x4000)
251 meta[8:16] = struct.pack(">Q", data_len)
252 fout.write(meta)
253 fout.write(b"\x00\x00\x00\x00")
254 pad = bytearray(4); pad[2]=0x40
255 fout.write(pad)
256 fout.write(b"\x00"*8)
257 while fout.tell() < 0x100:
258 fout.write(b"\x00")
259 block_size = 0x4000
260 num = (data_len + block_size -1)//block_size
261 # encrypt blocks
262 hashes = bytearray(num*0x10)
263 payload = bytearray()
264 for i in range(num):
265 blk_offset = i*block_size
266 fin.seek(blk_offset)
267 chunk = fin.read(block_size)
268 if i == num-1 and len(chunk)<block_size:
269 chunk += b"\x00"*((block_size-len(chunk)+15)&~15)
270 elif len(chunk)%16:
271 chunk += b"\x00"*(16-(len(chunk)%16))
272 block_key = calculate_block_key(i, NPD.parse(bytes(npd)))
273 ek = aes_ecb_encrypt(dev_klic, block_key)
274 iv = ek
275 cipher = AES.new(ek, AES.MODE_CBC, iv)
276 enc = cipher.encrypt(chunk)
277 payload += enc
278 # hash for metadata
279 cm = CMAC.new(dev_klic, ciphermod=AES)
280 cm.update(chunk)
281 arraycopy(cm.digest(),0,hashes, i*0x10, 0x10)
282 fout.write(hashes)
283 fout.write(payload)
284 footer = bytes.fromhex("4D6164652062792052325220546F6F6C")
285 fout.write(footer)
286 return STATUS_OK
287