Seregon/PkgToolBox

Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.

Python/57.3 KB/No license
Utilities/Trophy/TRPReader.py
PkgToolBox / Utilities / Trophy / TRPReader.py
1import os
2import hashlib
3import tempfile
4import shutil
5from io import FileIO, BytesIO
6import struct
7import logging
8 
9logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
10logger = logging.getLogger(__name__)
11 
12class Archiver:
13 def __init__(self, index, name, offset, size, bytes_data=None):
14 self.index = index
15 self.name = name
16 self.offset = offset
17 self.size = size
18 self.bytes_data = bytes_data
19 
20class TRPReader:
21 class TRPHeader:
22 def __init__(self):
23 self.magic = None
24 self.version = None
25 self.file_size = None
26 self.files_count = None
27 self.element_size = None
28 self.dev_flag = None
29 self.padding = None
30 self.sha1 = None
31 
32 def __init__(self, filename=None):
33 self._hdr = self.TRPHeader()
34 self._trophyList = []
35 self._hdrmagic = {
36 bytes([220, 162, 77, 0]), # Original magic number
37 bytes([5, 216, 3, 164]), # New magic number (a403d805 in little-endian)
38 bytes([126, 237, 245, 255]) # New magic number (fff5ed7e in little-endian)
39 }
40 self._iserror = False
41 self._readbytes = False
42 self._throwerror = True
43 self._error = ""
44 self._calculatedsha1 = None
45 self._inputfile = filename
46 self._title = None
47 self._npcommid = None
48 self._temp_dir = None
49 if filename:
50 self.load(filename)
51 
52 def load(self, filename=None):
53 if filename is None and self._inputfile is None:
54 raise ValueError("Filename must be provided either in the constructor or in the load method")
55
56 if filename is not None:
57 self._inputfile = filename
58 
59 try:
60 self._iserror = False
61 self._calculatedsha1 = None
62 self._trophyList = []
63
64 if not os.path.exists(self._inputfile):
65 raise FileNotFoundError(f"File not found: {self._inputfile}")
66
67 self.verify_file_structure()
68
69 with open(self._inputfile, 'rb') as fs:
70 self.read_content(fs)
71 # Ensure that self._title is set here or in read_content
72 if self._title is None:
73 self._title = "Unknown Title" # Or an appropriate default value
74 except Exception as e:
75 self._iserror = True
76 self._error = str(e)
77 logger.error(f"Error loading trophy file: {self._error}")
78 
79 if self._iserror and self._throwerror:
80 raise Exception(self._error)
81 
82 def read_header(self, fs):
83 try:
84 self._hdr.magic = fs.read(4)
85 self._hdr.version = fs.read(4)
86 self._hdr.file_size = fs.read(8)
87 self._hdr.files_count = fs.read(4)
88 self._hdr.element_size = fs.read(4)
89 self._hdr.dev_flag = fs.read(4)
90 
91 version = self.bytes_to_int(self._hdr.version, 32)
92 file_size = self.bytes_to_int(self._hdr.file_size, 64)
93 files_count = self.bytes_to_int(self._hdr.files_count, 32)
94 
95 logger.debug(f"Header: magic={self._hdr.magic.hex()}, version={version}, file_size={file_size}, files_count={files_count}")
96 
97 if version == 1:
98 self._hdr.padding = fs.read(36)
99 elif version == 2:
100 self._hdr.sha1 = fs.read(20)
101 self._hdr.padding = fs.read(16)
102 elif version == 3:
103 self._hdr.sha1 = fs.read(20)
104 self._hdr.padding = fs.read(48)
105 else:
106 raise ValueError(f"Invalid version: {version}")
107 except Exception as e:
108 logger.error(f"Error reading header: {e}")
109 raise
110 
111 def read_content(self, fs):
112 fs.seek(0)
113 data = fs.read()
114 png_signature = b'\x89PNG\r\n\x1a\n'
115 esfm_signature = b'ESFM'
116 ucp_signature = b'\x00\x00\x00\x00' # Magic number for Trophy00.ucp
117
118 i = 0
119 while i < len(data):
120 if data[i:i+8] == png_signature:
121 offset = i
122 size = self.get_png_size(data[i:])
123 if size:
124 name = f"TROP{len(self._trophyList):03d}.PNG"
125 self._trophyList.append(Archiver(len(self._trophyList), name, offset, size))
126 logger.info(f"Found PNG image '{name}' at offset 0x{offset:X}, size {size}")
127 i += size
128 else:
129 i += 1
130 elif data[i:i+4] == esfm_signature:
131 offset = i
132 try:
133 size = struct.unpack('>I', data[i+4:i+8])[0] + 8 # ESFM header (4 bytes) + size (4 bytes)
134 if size > 0 and size < len(data) - i:
135 name = f"FILE{len(self._trophyList):03d}.ESFM"
136 self._trophyList.append(Archiver(len(self._trophyList), name, offset, size))
137 logger.info(f"Found ESFM file '{name}' at offset 0x{offset:X}, size {size}")
138 i += size
139 else:
140 logger.warning(f"Invalid ESFM size at offset 0x{offset:X}: {size}")
141 i += 1
142 except struct.error:
143 logger.warning(f"Unable to read ESFM size at offset 0x{offset:X}")
144 i += 1
145 elif data[i:i+4] == ucp_signature:
146 offset = i
147 try:
148 size = struct.unpack('>I', data[i+4:i+8])[0] + 8 # UCP header (4 bytes) + size (4 bytes)
149 if size > 0 and size < len(data) - i:
150 name = f"TROPHY{len(self._trophyList):03d}.UCP"
151 self._trophyList.append(Archiver(len(self._trophyList), name, offset, size))
152 logger.info(f"Found UCP file '{name}' at offset 0x{offset:X}, size {size}")
153 i += size
154 else:
155 logger.warning(f"Invalid UCP size at offset 0x{offset:X}: {size}")
156 i += 1
157 except struct.error:
158 logger.warning(f"Unable to read UCP size at offset 0x{offset:X}")
159 i += 1
160 else:
161 i += 1
162
163 if not self._trophyList:
164 logger.warning("No files found in the TRP. The file might be corrupted or empty.")
165 else:
166 logger.info(f"Found {len(self._trophyList)} files")
167
168 # Update the file count in the header
169 self._hdr.files_count = len(self._trophyList).to_bytes(4, byteorder='little')
170 
171 def get_png_size(self, data):
172 try:
173 idx = data.index(b'IEND')
174 return idx + 12 # IEND chunk is 12 bytes long, including the 4-byte CRC
175 except ValueError:
176 return None
177 
178 @property
179 def read_bytes(self):
180 return self._readbytes
181 
182 @read_bytes.setter
183 def read_bytes(self, value):
184 self._readbytes = value
185 
186 @property
187 def trophy_list(self):
188 return self._trophyList
189 
190 @property
191 def file_size(self):
192 return self.bytes_to_int(self._hdr.file_size, 64)
193 
194 @property
195 def file_count(self):
196 return self.bytes_to_int(self._hdr.files_count, 32)
197 
198 @property
199 def version(self):
200 return self.bytes_to_int(self._hdr.version, 32)
201 
202 @property
203 def sha1(self):
204 if self.version <= 1:
205 return None
206 return self.byte_array_to_hex_string(self._hdr.sha1)
207 
208 @property
209 def calculated_sha1(self):
210 return self._calculatedsha1
211 
212 @property
213 def is_error(self):
214 return self._iserror
215 
216 @property
217 def throw_error(self):
218 return self._throwerror
219 
220 @throw_error.setter
221 def throw_error(self, value):
222 self._throwerror = value
223 
224 @property
225 def title(self):
226 return self._title
227 
228 @title.setter
229 def title(self, value):
230 self._title = value
231 
232 @property
233 def np_comm_id(self):
234 return self._npcommid
235 
236 @np_comm_id.setter
237 def np_comm_id(self, value):
238 self._npcommid = value
239 
240 def extract_file_to_memory(self, filename):
241 archiver = next((a for a in self._trophyList if a.name.upper().startswith(filename.upper())), None)
242 if archiver is None:
243 return None
244 
245 with open(self._inputfile, 'rb') as fs:
246 fs.seek(archiver.offset)
247 return fs.read(archiver.size)
248 
249 def byte_arrays_equal(self, first, second):
250 if first == second:
251 return True
252 if len(first) != len(second):
253 return False
254 return all(a == b for a, b in zip(first, second))
255 
256 @staticmethod
257 def byte_array_to_little_endian_int(byte_array):
258 return int.from_bytes(byte_array, byteorder='little')
259 
260 @staticmethod
261 def byte_array_to_utf8_string(byte_array, errors='ignore'):
262 return ''.join(chr(b) for b in byte_array if 32 <= b <= 126 or b in (9, 10, 13))
263 
264 @staticmethod
265 def byte_array_to_hex_string(byte_array):
266 return ''.join(f'{b:02x}' for b in byte_array)
267 
268 @staticmethod
269 def hex_string_to_long(hex_string):
270 return int(hex_string, 16)
271 
272 def calculate_sha1_hash(self):
273 if self.version <= 1:
274 return None
275 
276 sha1 = hashlib.sha1()
277 with open(self._inputfile, 'rb') as fs:
278 sha1.update(fs.read(28))
279 fs.seek(48)
280 while chunk := fs.read(8192):
281 sha1.update(chunk)
282 return sha1.hexdigest()
283 
284 def extract(self):
285 if self._inputfile is None:
286 raise ValueError("No input file specified")
287 
288 input_dir = os.path.dirname(self._inputfile)
289 input_filename = os.path.splitext(os.path.basename(self._inputfile))[0]
290
291 if self._temp_dir is None:
292 self._temp_dir = tempfile.mkdtemp(prefix=f"{input_filename}_extracted_", dir=input_dir)
293
294 with open(self._inputfile, 'rb') as fs:
295 for archiver in self._trophyList:
296 fs.seek(archiver.offset)
297 data = fs.read(archiver.size)
298 output_file = os.path.join(self._temp_dir, archiver.name)
299 with open(output_file, 'wb') as out:
300 out.write(data)
301 logger.info(f"Extracted {archiver.name} to {output_file}")
302
303 logger.info(f"All files extracted to: {self._temp_dir}")
304 return self._temp_dir
305 
306 def cleanup(self):
307 if self._temp_dir and os.path.exists(self._temp_dir):
308 try:
309 shutil.rmtree(self._temp_dir)
310 logger.info(f"Temporary directory {self._temp_dir} has been removed")
311 except Exception as e:
312 logger.error(f"Error removing temporary directory {self._temp_dir}: {e}")
313 self._temp_dir = None
314 
315 def extract_file(self, filename, outputpath, custom_name=None):
316 archiver = next((a for a in self._trophyList if a.name.upper().startswith(filename.upper())), None)
317 if archiver is None:
318 return
319 if not os.path.exists(outputpath):
320 os.makedirs(outputpath)
321 
322 with open(self._inputfile, 'rb') as fs:
323 fs.seek(archiver.offset)
324 data = fs.read(archiver.size)
325 output_file = os.path.join(outputpath, custom_name or archiver.name)
326 with open(output_file, 'wb') as out:
327 out.write(data)
328 
329 def verify_integrity(self):
330 if self.version > 1 and self.sha1:
331 calculated_sha1 = self.calculate_sha1_hash()
332 if calculated_sha1.lower() != self.sha1.lower():
333 print(f"Warning: SHA1 mismatch. File may be corrupted.")
334 print(f"Calculated: {calculated_sha1}")
335 print(f"Expected: {self.sha1}")
336 else:
337 print("SHA1 verification passed.")
338
339 expected_size = self.file_size
340 actual_size = os.path.getsize(self._inputfile)
341 if expected_size != actual_size:
342 print(f"Warning: File size mismatch. Expected: {expected_size}, Actual: {actual_size}")
343
344 if len(self._trophyList) != self.file_count:
345 print(f"Warning: Trophy count mismatch. Expected: {self.file_count}, Actual: {len(self._trophyList)}")
346 
347 return self._trophyList
348 
349 def verify_trophy_data(self, name, offset, size):
350 file_size = os.path.getsize(self._inputfile)
351 if offset < 0 or size < 0 or offset + size > file_size:
352 return False
353 if offset == 0 and size == 0:
354 return False
355 if len(name.strip()) == 0:
356 return False
357 return True
358 
359 def verify_file_structure(self):
360 try:
361 actual_size = os.path.getsize(self._inputfile)
362 if actual_size < 64: # Minimum header size
363 logger.error(f"File too small: {actual_size} bytes")
364 return False
365
366 with open(self._inputfile, 'rb') as fs:
367 magic = fs.read(4)
368 if magic not in self._hdrmagic:
369 logger.warning(f"Invalid file magic number: {magic.hex()}, but continuing anyway")
370 else:
371 logger.info(f"Valid magic number found: {magic.hex()}")
372
373 version_bytes = fs.read(4)
374 file_size_bytes = fs.read(8)
375 files_count_bytes = fs.read(4)
376
377 version = self.bytes_to_int(version_bytes, 32)
378 file_size = self.bytes_to_int(file_size_bytes, 64)
379 files_count = self.bytes_to_int(files_count_bytes, 32)
380
381 logger.debug(f"Raw bytes: version={version_bytes.hex()}, file_size={file_size_bytes.hex()}, files_count={files_count_bytes.hex()}")
382 logger.debug(f"File structure: version={version}, file_size={file_size}, files_count={files_count}")
383
384 if file_size != actual_size:
385 logger.warning(f"File size mismatch: expected {file_size}, actual {actual_size}")
386 self._hdr.file_size = actual_size.to_bytes(8, byteorder='little')
387
388 if version not in [1, 2, 3]:
389 logger.warning(f"Invalid version: {version}, assuming version 3")
390 version = 3
391 self._hdr.version = version.to_bytes(4, byteorder='little')
392
393 if files_count <= 0 or files_count > 1000000:
394 logger.warning(f"Invalid file count: {files_count}, will try to extract anyway")
395 files_count = 0 # Reset the count and let read_content determine it
396 self._hdr.files_count = files_count.to_bytes(4, byteorder='little')
397
398 return True
399 except Exception as e:
400 logger.error(f"Error during file structure verification: {e}")
401 return False
402 
403 @staticmethod
404 def bytes_to_int(bytes_data, bits=32):
405 value = int.from_bytes(bytes_data, byteorder='little', signed=False)
406 if bits == 32:
407 return value & 0xFFFFFFFF
408 elif bits == 64:
409 return value & 0xFFFFFFFFFFFFFFFF
410 else:
411 raise ValueError(f"Unsupported bit size: {bits}")
412 
413 def some_method_that_uses_title(self):
414 if self._title is None:
415 logger.warning("Title is not set")
416 return
417 
418 def get_temp_dir(self):
419 return self._temp_dir
420 
421 def read_content_flexible(self, fs):
422 fs.seek(0)
423 data = fs.read()
424 png_signature = b'\x89PNG\r\n\x1a\n'
425 esfm_signature = b'ESFM'
426 ucp_signature = b'\x00\x00\x00\x00' # Magic number for Trophy00.ucp
427
428 i = 0
429 while i < len(data):
430 if data[i:i+8] == png_signature:
431 offset = i
432 size = self.get_png_size(data[i:])
433 if size:
434 name = f"TROP{len(self._trophyList):03d}.PNG"
435 self._trophyList.append(Archiver(len(self._trophyList), name, offset, size))
436 logger.info(f"Found PNG image '{name}' at offset 0x{offset:X}, size {size}")
437 i += size
438 else:
439 i += 1
440 elif data[i:i+4] == esfm_signature:
441 offset = i
442 try:
443 size = struct.unpack('>I', data[i+4:i+8])[0] + 8
444 if size > 0 and size < len(data) - i:
445 name = f"FILE{len(self._trophyList):03d}.ESFM"
446 self._trophyList.append(Archiver(len(self._trophyList), name, offset, size))
447 logger.info(f"Found ESFM file '{name}' at offset 0x{offset:X}, size {size}")
448 i += size
449 else:
450 i += 1
451 except struct.error:
452 i += 1
453 elif data[i:i+4] == ucp_signature:
454 offset = i
455 try:
456 size = struct.unpack('>I', data[i+4:i+8])[0] + 8
457 if size > 0 and size < len(data) - i:
458 name = f"TROPHY{len(self._trophyList):03d}.UCP"
459 self._trophyList.append(Archiver(len(self._trophyList), name, offset, size))
460 logger.info(f"Found UCP file '{name}' at offset 0x{offset:X}, size {size}")
461 i += size
462 else:
463 i += 1
464 except struct.error:
465 i += 1
466 else:
467 i += 1
468
469 if not self._trophyList:
470 logger.warning("No files found in the TRP. The file might be corrupted or empty.")
471 else:
472 logger.info(f"Found {len(self._trophyList)} files")
473
474 self._hdr.files_count = len(self._trophyList).to_bytes(4, byteorder='little')
475 
476 def decrypt_trp(self, input_file, output_dir):
477 """Decrypt and extract TRP file contents"""
478 try:
479 # Carica il file TRP se non è già stato caricato
480 if not self._trophyList:
481 self.load(input_file)
482
483 # Crea la directory di output se non esiste
484 os.makedirs(output_dir, exist_ok=True)
485
486 # Estrai ogni file trovato
487 for trophy in self._trophyList:
488 try:
489 # Determina il tipo di file dall'estensione
490 file_ext = os.path.splitext(trophy.name)[1].lower()
491
492 # Leggi i dati del file
493 with open(input_file, 'rb') as f:
494 f.seek(trophy.offset)
495 data = f.read(trophy.size)
496
497 # Determina il nome del file di output
498 if "TROP" in trophy.name.upper():
499 output_name = f"trophy_{trophy.index:03d}{file_ext}"
500 else:
501 output_name = trophy.name
502
503 # Salva il file
504 output_path = os.path.join(output_dir, output_name)
505 with open(output_path, 'wb') as f:
506 f.write(data)
507
508 logging.info(f"Extracted: {output_name}")
509
510 except Exception as e:
511 logging.error(f"Error extracting {trophy.name}: {e}")
512 continue
513
514 return "Trophy file decrypted and extracted successfully"
515
516 except Exception as e:
517 error_msg = f"Error decrypting TRP: {str(e)}"
518 logging.error(error_msg)
519 raise Exception(error_msg)