Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | import os |
| 2 | import hashlib |
| 3 | import struct |
| 4 | import logging |
| 5 | from io import BytesIO |
| 6 | from decimal import Decimal |
| 7 | import re |
| 8 | from pathlib import Path |
| 9 | |
| 10 | logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| 11 | logger = logging.getLogger(__name__) |
| 12 | |
| 13 | class Archiver: |
| 14 | def __init__(self, index, name, offset, size, bytes_data): |
| 15 | self.index = index |
| 16 | self.name = name |
| 17 | self.offset = offset |
| 18 | self.size = size |
| 19 | self.bytes = bytes_data |
| 20 | |
| 21 | class TRPCreator: |
| 22 | class TRPHeader: |
| 23 | def __init__(self): |
| 24 | self.magic = None |
| 25 | self.version = None |
| 26 | self.file_size = None |
| 27 | self.files_count = None |
| 28 | self.element_size = None |
| 29 | self.dev_flag = None |
| 30 | self.sha1 = None |
| 31 | self.padding = None |
| 32 | |
| 33 | def __init__(self): |
| 34 | self._hdr = self.TRPHeader() |
| 35 | self._trophyList = [] |
| 36 | self._hdrmagic = bytearray([220, 162, 77, 0]) |
| 37 | self._iserror = False |
| 38 | self._setversion = 0 |
| 39 | self._set_title = None # Added set_title attribute |
| 40 | self.header_size = 0x400 |
| 41 | self.trophy_size = 0x200 |
| 42 | self.image_size = 0x12000 |
| 43 | |
| 44 | @property |
| 45 | def SetVersion(self): |
| 46 | return self._setversion |
| 47 | |
| 48 | @SetVersion.setter |
| 49 | def SetVersion(self, value): |
| 50 | self._setversion = value |
| 51 | |
| 52 | @property |
| 53 | def set_title(self): # Added set_title property |
| 54 | return self._set_title |
| 55 | |
| 56 | @set_title.setter |
| 57 | def set_title(self, value): |
| 58 | self._set_title = value |
| 59 | |
| 60 | def Create(self, filename, contents): |
| 61 | try: |
| 62 | if self._setversion < 1 or self._setversion > 3: |
| 63 | raise ValueError("File version must be one of these { 1, 2, 3 }.") |
| 64 | self._trophyList.clear() |
| 65 | contents = self.SortList(contents) |
| 66 | memoryStream = BytesIO() |
| 67 | num1 = 0 |
| 68 | count = len(contents) |
| 69 | num2 = 64 * len(contents) |
| 70 | for m_Index, path in enumerate(contents): |
| 71 | fileName = os.path.basename(path) |
| 72 | with open(path, 'rb') as f: |
| 73 | m_Bytes = f.read() |
| 74 | length = len(m_Bytes) |
| 75 | pads = self.GetPads(length, 16) |
| 76 | num1 += pads |
| 77 | self._trophyList.append(Archiver(m_Index, fileName, int(Decimal(num2) + Decimal(96 if self._setversion == 3 else 64)), length, m_Bytes)) |
| 78 | num2 = int(Decimal(num2) + Decimal(length) + Decimal(pads)) |
| 79 | size = self.GetSize() |
| 80 | header = self.GetHeader(self._setversion, int(Decimal((96 if self._setversion == 3 else 64) + len(self.GetHeaderFiles()) + size + num1)), count, 64, 0, None) |
| 81 | memoryStream.write(header) |
| 82 | headerFiles = self.GetHeaderFiles() |
| 83 | memoryStream.write(headerFiles) |
| 84 | bytes1 = self.GetBytes() |
| 85 | memoryStream.write(bytes1) |
| 86 | if self._setversion > 1: |
| 87 | bytes2 = self.HexStringToBytes(self.CalculateSHA1Hash(memoryStream.getvalue())) |
| 88 | memoryStream.seek(28) |
| 89 | memoryStream.write(bytes2) |
| 90 | with open(filename, 'wb') as f: |
| 91 | f.write(memoryStream.getvalue()) |
| 92 | logger.info(f"File '{filename}' created successfully.") |
| 93 | except Exception as e: |
| 94 | logger.error(f"Error creating file: {e}") |
| 95 | raise |
| 96 | |
| 97 | def CreateFromList(self, filename, contents): |
| 98 | if self._setversion < 1 or self._setversion > 3: |
| 99 | raise Exception("File version must be one of these { 1, 2, 3 }.") |
| 100 | self._trophyList.clear() |
| 101 | memoryStream = BytesIO() |
| 102 | num1 = 0 |
| 103 | count = len(contents) |
| 104 | num2 = 64 * len(contents) |
| 105 | for m_Index, content in enumerate(contents): |
| 106 | name = content.name |
| 107 | bytes_data = content.bytes |
| 108 | size = content.size |
| 109 | pads = self.GetPads(size, 16) |
| 110 | num1 += pads |
| 111 | self._trophyList.append(Archiver(m_Index, name, int(Decimal(num2) + Decimal(96 if self._setversion == 3 else 64)), size, bytes_data)) |
| 112 | num2 = int(Decimal(num2) + Decimal(size) + Decimal(pads)) |
| 113 | size1 = self.GetSize() |
| 114 | header = self.GetHeader(self._setversion, int(Decimal((96 if self._setversion == 3 else 64) + len(self.GetHeaderFiles()) + size1 + num1)), count, 64, 0, None) |
| 115 | memoryStream.write(header) |
| 116 | headerFiles = self.GetHeaderFiles() |
| 117 | memoryStream.write(headerFiles) |
| 118 | bytes1 = self.GetBytes() |
| 119 | memoryStream.write(bytes1) |
| 120 | if self._setversion > 1: |
| 121 | bytes2 = self.HexStringToBytes(self.CalculateSHA1Hash(memoryStream.getvalue())) |
| 122 | memoryStream.seek(28) |
| 123 | memoryStream.write(bytes2) |
| 124 | with open(filename, 'wb') as f: |
| 125 | f.write(memoryStream.getvalue()) |
| 126 | |
| 127 | def SortList(self, alist): |
| 128 | patterns = [ |
| 129 | "TROPCONF.(E?)SFM", |
| 130 | "TROP.(E?)SFM", |
| 131 | "TROP_\\d+.(E?)SFM", |
| 132 | "ICON0.PNG", |
| 133 | "ICON0_\\d+.PNG", |
| 134 | "GR\\d+.PNG", |
| 135 | "GR\\d+_\\d+.PNG", |
| 136 | "TROP\\d+.PNG" |
| 137 | ] |
| 138 | arrayList1, arrayList2, arrayList3, arrayList4, arrayList5 = [], [], [], [], [] |
| 139 | for pattern in patterns: |
| 140 | for item in alist: |
| 141 | if re.match(pattern, os.path.basename(item), re.IGNORECASE): |
| 142 | if os.path.basename(item).upper().startswith("TROPCONF"): |
| 143 | arrayList1.append(item) |
| 144 | elif os.path.basename(item).upper().endswith("SFM"): |
| 145 | arrayList2.append(item) |
| 146 | elif os.path.basename(item).upper().startswith("ICON"): |
| 147 | arrayList3.append(item) |
| 148 | elif os.path.basename(item).upper().startswith("GR"): |
| 149 | arrayList4.append(item) |
| 150 | elif os.path.basename(item).upper().endswith("PNG"): |
| 151 | arrayList5.append(item) |
| 152 | arrayList2.sort() |
| 153 | arrayList3.sort() |
| 154 | arrayList4.sort() |
| 155 | arrayList5.sort() |
| 156 | arrayList1.extend(arrayList2) |
| 157 | arrayList1.extend(arrayList3) |
| 158 | arrayList1.extend(arrayList4) |
| 159 | arrayList1.extend(arrayList5) |
| 160 | return arrayList1 |
| 161 | |
| 162 | def GetHeaderFiles(self): |
| 163 | buffer1 = bytearray([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) |
| 164 | buffer2 = bytearray([0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) |
| 165 | memoryStream = BytesIO() |
| 166 | for item in self._trophyList: |
| 167 | bytes1 = item.name.encode('ascii') |
| 168 | bytes1 = bytes1.ljust(32, b'\0') |
| 169 | memoryStream.write(bytes1) |
| 170 | bytes2 = item.offset.to_bytes(4, 'big') # Cambiato da 'little' a 'big' |
| 171 | memoryStream.write(bytes2) |
| 172 | bytes3 = item.size.to_bytes(4, 'big') # Cambiato da 'little' a 'big' |
| 173 | memoryStream.write(bytes3) |
| 174 | if item.name.upper().endswith(".SFM"): |
| 175 | memoryStream.write(buffer1) |
| 176 | elif item.name.upper().endswith(".ESFM"): |
| 177 | memoryStream.write(buffer2) |
| 178 | else: |
| 179 | memoryStream.write(bytearray(16)) |
| 180 | return memoryStream.getvalue() |
| 181 | |
| 182 | def GetSize(self): |
| 183 | uint64 = 0 |
| 184 | for item in self._trophyList: |
| 185 | uint64 += item.size |
| 186 | return uint64 |
| 187 | |
| 188 | def GetBytes(self): |
| 189 | memoryStream = BytesIO() |
| 190 | for item in self._trophyList: |
| 191 | memoryStream.write(item.bytes) |
| 192 | pads = self.GetPads(len(item.bytes), 16) |
| 193 | if pads >= 0: |
| 194 | memoryStream.write(bytearray(pads)) |
| 195 | return memoryStream.getvalue() |
| 196 | |
| 197 | def GetHeader(self, version, file_size, files_count, element_size, dev_flag, sha1): |
| 198 | trpHeader = self.TRPHeader() |
| 199 | memoryStream = BytesIO() |
| 200 | trpHeader.magic = self._hdrmagic |
| 201 | memoryStream.write(trpHeader.magic) |
| 202 | trpHeader.version = version.to_bytes(4, 'big') # Cambiato da 'little' a 'big' |
| 203 | memoryStream.write(trpHeader.version) |
| 204 | trpHeader.file_size = file_size.to_bytes(8, 'big') # Cambiato da 'little' a 'big' |
| 205 | memoryStream.write(trpHeader.file_size) |
| 206 | trpHeader.files_count = files_count.to_bytes(4, 'big') # Cambiato da 'little' a 'big' |
| 207 | memoryStream.write(trpHeader.files_count) |
| 208 | trpHeader.element_size = element_size.to_bytes(4, 'big')# Cambiato da 'little' a 'big' |
| 209 | memoryStream.write(trpHeader.element_size) |
| 210 | trpHeader.dev_flag = dev_flag.to_bytes(4, 'big') # Cambiato da 'little' a 'big' |
| 211 | memoryStream.write(trpHeader.dev_flag) |
| 212 | if version in [1, 2]: |
| 213 | memoryStream.write(bytearray(36)) |
| 214 | elif version == 3: |
| 215 | memoryStream.write(bytearray(20)) |
| 216 | memoryStream.write(bytearray([48, 49, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])) |
| 217 | self._hdr = trpHeader |
| 218 | return memoryStream.getvalue() |
| 219 | |
| 220 | def CalculateSHA1Hash(self, byte_data): |
| 221 | sha1 = hashlib.sha1() |
| 222 | sha1.update(byte_data) |
| 223 | return sha1.hexdigest().upper() |
| 224 | |
| 225 | def HexStringToBytes(self, strInput): |
| 226 | if not self.HexStringIsValid(strInput): |
| 227 | return None |
| 228 | return bytes.fromhex(strInput) |
| 229 | |
| 230 | def BytesToHexString(self, bytes_Input): |
| 231 | return ''.join(f'{b:02X}' for b in bytes_Input) |
| 232 | |
| 233 | def HexStringIsValid(self, Hex): |
| 234 | return all(c in '0123456789ABCDEFabcdef' for c in Hex) |
| 235 | |
| 236 | def GetPads(self, fsize, align=16): |
| 237 | num = 0 |
| 238 | while (fsize + num) % align != 0: |
| 239 | num += 1 |
| 240 | return num |
| 241 | |
| 242 | def create(self, output_path, trophy_files): |
| 243 | """Create TRP file from trophy files""" |
| 244 | try: |
| 245 | # Ordina i file per nome per mantenere l'ordine corretto |
| 246 | trophy_files = sorted(trophy_files, key=lambda x: x.name) |
| 247 | |
| 248 | # Calcola le dimensioni |
| 249 | total_size = ( |
| 250 | self.header_size + # Header |
| 251 | (len(trophy_files) * self.trophy_size) + # Trophy entries |
| 252 | (len([f for f in trophy_files if f.name.upper().endswith('.PNG')]) * self.image_size) # Images |
| 253 | ) |
| 254 | |
| 255 | # Crea il file TRP |
| 256 | with open(output_path, 'wb') as f: |
| 257 | # Scrivi l'header |
| 258 | header = struct.pack('<4sIQII', |
| 259 | b'\xDC\xA2\x4D\x00', # Magic |
| 260 | 1, # Version |
| 261 | total_size, # File size |
| 262 | len(trophy_files), # Number of files |
| 263 | self.trophy_size # Trophy entry size |
| 264 | ) |
| 265 | f.write(header) |
| 266 | |
| 267 | # Padding fino a 0x400 |
| 268 | f.write(b'\x00' * (self.header_size - len(header))) |
| 269 | |
| 270 | # Scrivi i trofei |
| 271 | for trophy in trophy_files: |
| 272 | if not trophy.name.upper().endswith('.PNG'): |
| 273 | continue |
| 274 | |
| 275 | # Leggi i dati del trofeo |
| 276 | with open(trophy.name, 'rb') as tf: |
| 277 | trophy_data = tf.read() |
| 278 | |
| 279 | # Scrivi i dati del trofeo |
| 280 | f.write(trophy_data[:self.trophy_size]) |
| 281 | |
| 282 | # Padding se necessario |
| 283 | if len(trophy_data) < self.trophy_size: |
| 284 | f.write(b'\x00' * (self.trophy_size - len(trophy_data))) |
| 285 | |
| 286 | # Scrivi le immagini |
| 287 | for trophy in trophy_files: |
| 288 | if trophy.name.upper().endswith('.PNG'): |
| 289 | with open(trophy.name, 'rb') as tf: |
| 290 | image_data = tf.read() |
| 291 | |
| 292 | # Scrivi l'immagine |
| 293 | f.write(image_data) |
| 294 | |
| 295 | # Padding se necessario |
| 296 | if len(image_data) < self.image_size: |
| 297 | f.write(b'\x00' * (self.image_size - len(image_data))) |
| 298 | |
| 299 | logging.info(f"TRP file created successfully: {output_path}") |
| 300 | return True |
| 301 | |
| 302 | except Exception as e: |
| 303 | error_msg = f"Error creating TRP file: {str(e)}" |
| 304 | logging.error(error_msg) |
| 305 | raise Exception(error_msg) |
| 306 | |
| 307 | def validate_trophy_files(self, files): |
| 308 | """Validate trophy files before creating TRP""" |
| 309 | try: |
| 310 | # Verifica che ci siano file |
| 311 | if not files: |
| 312 | raise ValueError("No trophy files provided") |
| 313 | |
| 314 | # Verifica che ci siano solo file PNG e dati trofeo |
| 315 | valid_extensions = {'.PNG', '.DAT'} |
| 316 | for f in files: |
| 317 | ext = os.path.splitext(f.name)[1].upper() |
| 318 | if ext not in valid_extensions: |
| 319 | raise ValueError(f"Invalid file type: {f.name}") |
| 320 | |
| 321 | # Verifica che ci sia almeno un'immagine |
| 322 | png_files = [f for f in files if f.name.upper().endswith('.PNG')] |
| 323 | if not png_files: |
| 324 | raise ValueError("No trophy images found") |
| 325 | |
| 326 | return True |
| 327 | |
| 328 | except Exception as e: |
| 329 | error_msg = f"Trophy files validation failed: {str(e)}" |
| 330 | logging.error(error_msg) |
| 331 | raise Exception(error_msg) |