Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | import struct |
| 2 | import os |
| 3 | import json |
| 4 | from .package_base import PackageBase |
| 5 | from .enums import DRMType, ContentType, IROTag |
| 6 | from Utilities import Logger |
| 7 | |
| 8 | class PackagePS5(PackageBase): |
| 9 | MAGIC_PS5 = 0x7F464948 # ?FIH for PS5 |
| 10 | |
| 11 | def __init__(self, file: str): |
| 12 | super().__init__(file) |
| 13 | self.is_ps5 = False |
| 14 | self.pkg_type = None |
| 15 | self.pkg_revision = None |
| 16 | self.pkg_file_count = 0 |
| 17 | self.entry_table_offset = None |
| 18 | self.entry_table_size = None |
| 19 | self.body_offset = None |
| 20 | self.body_size = None |
| 21 | self.promote_size = None |
| 22 | self.version_date = None |
| 23 | self.version_hash = None |
| 24 | self.iro_tag = None |
| 25 | self.package_digest = None |
| 26 | self.pfs_area_digest = None |
| 27 | self.fih_offset = None |
| 28 | self.fih_size = None |
| 29 | self.pfs_offset = None |
| 30 | self.pfs_size = None |
| 31 | self.sc_offset = None |
| 32 | self.sc_size = None |
| 33 | self.si_offset = None |
| 34 | self.si_size = None |
| 35 | self.title_name = None |
| 36 | |
| 37 | with open(file, "rb") as fp: |
| 38 | magic = struct.unpack(">I", fp.read(4))[0] |
| 39 | Logger.log_information(f"Read magic number: {magic:08X}") |
| 40 | if magic == self.MAGIC_PS5: |
| 41 | self.is_ps5 = True |
| 42 | self._load_ps5_pkg(fp) |
| 43 | else: |
| 44 | raise ValueError(f"Unknown PKG format: {magic:08X}") |
| 45 | |
| 46 | def _load_ps5_pkg(self, fp): |
| 47 | try: |
| 48 | header_format = ">4s2sH4sQ4I4Q2I14s16s16s16s16s16s16s16s16s16s16s16s16s16s16s16s" |
| 49 | fp.seek(0) |
| 50 | data = fp.read(struct.calcsize(header_format)) |
| 51 | unpacked_data = struct.unpack(header_format, data) |
| 52 | |
| 53 | Logger.log_information(f"Number of unpacked values: {len(unpacked_data)}") |
| 54 | |
| 55 | self.magic = unpacked_data[0] |
| 56 | self.pkg_type = unpacked_data[1] |
| 57 | self.pkg_revision = unpacked_data[2] |
| 58 | self.pkg_0x008 = unpacked_data[3] |
| 59 | self.pkg_file_count = unpacked_data[4] |
| 60 | self.entry_table_offset, self.entry_table_size = unpacked_data[5:7] |
| 61 | self.body_offset, self.body_size = unpacked_data[7:9] |
| 62 | self.content_id = self._safe_decode(unpacked_data[9]) |
| 63 | self.drm_type, self.content_type = unpacked_data[10:12] |
| 64 | self.content_flags, self.promote_size = unpacked_data[12:14] |
| 65 | self.version_date = unpacked_data[14] |
| 66 | self.version_hash = unpacked_data[15] |
| 67 | self.iro_tag = unpacked_data[16] |
| 68 | |
| 69 | self._initialize_ps5_fields() |
| 70 | |
| 71 | if self.entry_table_offset == 0 or self.entry_table_size == 0: |
| 72 | Logger.log_warning("Invalid entry table offset or size, package might be encrypted") |
| 73 | self.files = {} |
| 74 | else: |
| 75 | self.__load_ps5_files(fp) |
| 76 | |
| 77 | param_json = self._find_file_by_name("sce_sys/param.json") |
| 78 | if param_json: |
| 79 | self._parse_param_json(fp, param_json) |
| 80 | else: |
| 81 | Logger.log_warning("param.json not found in the package") |
| 82 | |
| 83 | self._read_digests_and_layout(fp) |
| 84 | self.files = self.files if hasattr(self, 'files') else {} |
| 85 | |
| 86 | self._find_important_files() |
| 87 | |
| 88 | except Exception as e: |
| 89 | Logger.log_error(f"Error loading PS5 PKG file: {str(e)}") |
| 90 | self.files = {} |
| 91 | raise ValueError(f"Error loading PS5 PKG file: {str(e)}") |
| 92 | |
| 93 | def _initialize_ps5_fields(self): |
| 94 | self.title_id = None |
| 95 | self.content_version = None |
| 96 | self.required_system_software_version = None |
| 97 | self.application_category_type = None |
| 98 | self.application_drm_type = None |
| 99 | self.default_language = None |
| 100 | self.title_names = {} |
| 101 | self.sdk_version = None |
| 102 | self.master_version = None |
| 103 | self.target_content_version = None |
| 104 | self.origin_content_version = None |
| 105 | self.pubtools = {} |
| 106 | self.creation_date = None |
| 107 | self.publishing_tools_version = None |
| 108 | self.attribute = None |
| 109 | self.attribute2 = None |
| 110 | self.attribute3 = None |
| 111 | self.content_badge_type = None |
| 112 | self.download_data_size = None |
| 113 | self.mass_size = None |
| 114 | self.flexible_memory_size = None |
| 115 | self.age_levels = {} |
| 116 | self.game_intents = [] |
| 117 | self.deeplink_uri = None |
| 118 | self.version_file_uri = None |
| 119 | |
| 120 | def _find_file_by_name(self, name): |
| 121 | """Find a file by name with tolerant matching for PS5 packages. |
| 122 | |
| 123 | Normalizes path separators and allows suffix-based matches to account for |
| 124 | cases where file names are stored without a stable root or with variant separators. |
| 125 | """ |
| 126 | try: |
| 127 | def _norm(p: str) -> str: |
| 128 | if not p: |
| 129 | return "" |
| 130 | p = p.replace("\\", "/") |
| 131 | # strip common leading prefixes |
| 132 | while p.startswith("./"): |
| 133 | p = p[2:] |
| 134 | if p.startswith("/"): |
| 135 | p = p[1:] |
| 136 | # Heuristic: fix inverted directory name 'sys_sce' -> 'sce_sys' |
| 137 | p = p.replace("sys_sce/", "sce_sys/") |
| 138 | p = p.replace("/sys_sce/", "/sce_sys/") |
| 139 | return p |
| 140 | |
| 141 | target = _norm(name).lower() |
| 142 | for file in self.files.values(): |
| 143 | n = _norm(file.get("name", "")).lower() |
| 144 | if not n: |
| 145 | continue |
| 146 | if n == target or n.endswith(target): |
| 147 | return file |
| 148 | return None |
| 149 | except Exception: |
| 150 | # Fallback to exact match if anything goes wrong |
| 151 | return next((file for file in self.files.values() if file.get("name") == name), None) |
| 152 | |
| 153 | def _parse_param_json(self, fp, param_json): |
| 154 | try: |
| 155 | fp.seek(param_json["offset"]) |
| 156 | json_data = fp.read(param_json["size"]) |
| 157 | json_content = json.loads(json_data) |
| 158 | |
| 159 | self.title_id = json_content.get("titleId") |
| 160 | self.content_id = json_content.get("contentId") |
| 161 | self.content_version = json_content.get("contentVersion") |
| 162 | self.required_system_software_version = json_content.get("requiredSystemSoftwareVersion") |
| 163 | self.application_category_type = json_content.get("applicationCategoryType") |
| 164 | self.application_drm_type = json_content.get("applicationDrmType") |
| 165 | |
| 166 | localized_params = json_content.get("localizedParameters", {}) |
| 167 | self.default_language = localized_params.get("defaultLanguage") |
| 168 | self.title_names = {} |
| 169 | for lang, data in localized_params.items(): |
| 170 | if isinstance(data, dict) and "titleName" in data: |
| 171 | self.title_names[lang] = data["titleName"] |
| 172 | |
| 173 | self.sdk_version = json_content.get("sdkVersion") |
| 174 | self.master_version = json_content.get("masterVersion") |
| 175 | self.target_content_version = json_content.get("targetContentVersion") |
| 176 | self.origin_content_version = json_content.get("originContentVersion") |
| 177 | |
| 178 | self.pubtools = json_content.get("pubtools", {}) |
| 179 | self.creation_date = self.pubtools.get("creationDate") |
| 180 | self.publishing_tools_version = self.pubtools.get("toolVersion") |
| 181 | |
| 182 | self.attribute = json_content.get("attribute") |
| 183 | self.attribute2 = json_content.get("attribute2") |
| 184 | self.attribute3 = json_content.get("attribute3") |
| 185 | self.content_badge_type = json_content.get("contentBadgeType") |
| 186 | |
| 187 | self.download_data_size = json_content.get("downloadDataSize") |
| 188 | self.mass_size = json_content.get("massSize") |
| 189 | |
| 190 | kernel_info = json_content.get("kernel", {}) |
| 191 | self.flexible_memory_size = kernel_info.get("flexibleMemorySize") |
| 192 | |
| 193 | self.age_levels = json_content.get("ageLevel", {}) |
| 194 | |
| 195 | game_intents = json_content.get("gameIntent", {}).get("permittedIntents", []) |
| 196 | self.game_intents = [intent.get("intentType") for intent in game_intents if "intentType" in intent] |
| 197 | |
| 198 | self.deeplink_uri = json_content.get("deeplinkUri") |
| 199 | self.version_file_uri = json_content.get("versionFileUri") |
| 200 | |
| 201 | Logger.log_information(f"Parsed param.json: Title ID: {self.title_id}, Content ID: {self.content_id}, Default Title: {self.title_names.get(self.default_language)}") |
| 202 | except Exception as e: |
| 203 | Logger.log_error(f"Error parsing param.json: {str(e)}") |
| 204 | |
| 205 | def _find_important_files(self): |
| 206 | # If files table is empty or missing, skip noisy warnings (likely encrypted/corrupted) |
| 207 | if not getattr(self, 'files', None): |
| 208 | Logger.log_information("Skipping important files check: no file index available (package may be encrypted)") |
| 209 | return |
| 210 | important_files = [ |
| 211 | "eboot.bin", |
| 212 | "sce_sys/icon0.png", |
| 213 | "sce_sys/pic0.png", |
| 214 | "sce_sys/pic1.png", |
| 215 | "sce_sys/playgo-chunk.dat", |
| 216 | "sce_sys/playgo-manifest.xml", |
| 217 | "sce_sys/trophy/trophy00.trp" |
| 218 | ] |
| 219 | |
| 220 | for file_name in important_files: |
| 221 | file_info = self._find_file_by_name(file_name) |
| 222 | if file_info: |
| 223 | Logger.log_information(f"Found important file: {file_name}") |
| 224 | else: |
| 225 | Logger.log_warning(f"Important file not found: {file_name}") |
| 226 | |
| 227 | def _read_digests_and_layout(self, fp): |
| 228 | try: |
| 229 | fp.seek(0x100) |
| 230 | self.package_digest = fp.read(32).hex() |
| 231 | self.pfs_area_digest = fp.read(32).hex() |
| 232 | |
| 233 | if self.package_digest == '0' * 64 and self.pfs_area_digest == '0' * 64: |
| 234 | Logger.log_warning("Digests are all zeros, package might be corrupted or encrypted") |
| 235 | |
| 236 | fp.seek(0x400) |
| 237 | layout_data = struct.unpack(">QQQQQQQQ", fp.read(64)) |
| 238 | self.fih_offset, self.fih_size = layout_data[0:2] |
| 239 | self.pfs_offset, self.pfs_size = layout_data[2:4] |
| 240 | self.sc_offset, self.sc_size = layout_data[4:6] |
| 241 | self.si_offset, self.si_size = layout_data[6:8] |
| 242 | |
| 243 | if all(v == 0 for v in layout_data): |
| 244 | Logger.log_warning("All layout values are zero, package might be corrupted or encrypted") |
| 245 | |
| 246 | Logger.log_information(f"Package digest: {self.package_digest}") |
| 247 | Logger.log_information(f"PFS area digest: {self.pfs_area_digest}") |
| 248 | |
| 249 | Logger.log_information(f"FIH: offset 0x{self.fih_offset:X}, size 0x{self.fih_size:X}") |
| 250 | Logger.log_information(f"PFS: offset 0x{self.pfs_offset:X}, size 0x{self.pfs_size:X}") |
| 251 | Logger.log_information(f"SC: offset 0x{self.sc_offset:X}, size 0x{self.sc_size:X}") |
| 252 | Logger.log_information(f"SI: offset 0x{self.si_offset:X}, size 0x{self.si_size:X}") |
| 253 | |
| 254 | except Exception as e: |
| 255 | Logger.log_error(f"Error reading digests and layout: {str(e)}") |
| 256 | raise |
| 257 | |
| 258 | def __load_ps5_files(self, fp): |
| 259 | try: |
| 260 | Logger.log_information(f"Loading PS5 files. Entry table offset: 0x{self.entry_table_offset:X}, size: 0x{self.entry_table_size:X}") |
| 261 | |
| 262 | fp.seek(self.entry_table_offset) |
| 263 | entry_count = min(self.entry_table_size // 32, self.pkg_file_count, 10000) |
| 264 | |
| 265 | entry_format = ">IIQQII" |
| 266 | self.files = {} |
| 267 | for i in range(entry_count): |
| 268 | try: |
| 269 | entry_data = fp.read(32) |
| 270 | if len(entry_data) < 32: |
| 271 | Logger.log_warning(f"Reached end of file while reading entries. Processed {i} entries.") |
| 272 | break |
| 273 | file_id, file_type, file_offset, file_size, padding1, padding2 = struct.unpack(entry_format, entry_data) |
| 274 | |
| 275 | file_end = os.path.getsize(self.original_file) |
| 276 | if file_offset >= file_end or file_size > file_end - file_offset: |
| 277 | Logger.log_warning(f"File with unreasonable offset or size: ID {file_id}, offset 0x{file_offset:X}, size 0x{file_size:X}") |
| 278 | continue |
| 279 | |
| 280 | self.files[file_id] = { |
| 281 | "id": file_id, |
| 282 | "type": file_type, |
| 283 | "offset": file_offset, |
| 284 | "size": file_size, |
| 285 | "encrypted": (file_type & PackageBase.FLAG_ENCRYPTED) == PackageBase.FLAG_ENCRYPTED |
| 286 | } |
| 287 | except struct.error as e: |
| 288 | Logger.log_warning(f"Error unpacking file entry {i}: {str(e)}") |
| 289 | break |
| 290 | |
| 291 | if not self.files: |
| 292 | Logger.log_error("No valid files found in the package") |
| 293 | |
| 294 | for key, file in self.files.items(): |
| 295 | try: |
| 296 | fp.seek(file["offset"]) |
| 297 | fn = fp.read(256).split(b'\x00')[0] |
| 298 | if fn: |
| 299 | self.files[key]["name"] = self._safe_decode(fn) |
| 300 | except (OverflowError, OSError) as e: |
| 301 | Logger.log_warning(f"Error reading filename for file ID {key}: {e}") |
| 302 | self.files[key]["name"] = f"file_{key}" |
| 303 | |
| 304 | Logger.log_information(f"Loaded {len(self.files)} files from PS5 PKG") |
| 305 | except Exception as e: |
| 306 | Logger.log_error(f"Error loading PS5 file entries: {str(e)}") |
| 307 | raise ValueError(f"Error loading PS5 file entries: {str(e)}") |
| 308 | |
| 309 | def get_info(self): |
| 310 | info = super().get_info() |
| 311 | if self.is_ps5: |
| 312 | info.update({ |
| 313 | "pkg_magic": self.magic.hex() if isinstance(self.magic, bytes) else str(self.magic), |
| 314 | "pkg_type": self.pkg_type.hex() if isinstance(self.pkg_type, bytes) else str(self.pkg_type), |
| 315 | "pkg_revision": self.pkg_revision, |
| 316 | "pkg_file_count": self.pkg_file_count, |
| 317 | "content_id": self.content_id, |
| 318 | "title_id": self.title_id, |
| 319 | "title_name": self.title_name, |
| 320 | "content_version": self.content_version, |
| 321 | "required_system_software_version": self.required_system_software_version, |
| 322 | "application_category_type": self.application_category_type, |
| 323 | "application_drm_type": self.application_drm_type, |
| 324 | "sdk_version": self.sdk_version, |
| 325 | "master_version": self.master_version, |
| 326 | "creation_date": self.creation_date, |
| 327 | "publishing_tools_version": self.publishing_tools_version, |
| 328 | "drm_type": f"0x{self.drm_type:X}" if isinstance(self.drm_type, int) else str(self.drm_type), |
| 329 | "content_type": f"0x{self.content_type:X}" if isinstance(self.content_type, int) else str(self.content_type), |
| 330 | "content_flags": f"0x{self.content_flags:X}" if isinstance(self.content_flags, int) else str(self.content_flags), |
| 331 | "package_digest": self.package_digest, |
| 332 | "pfs_area_digest": self.pfs_area_digest, |
| 333 | "fih_offset": f"0x{self.fih_offset:X}" if isinstance(self.fih_offset, int) else str(self.fih_offset), |
| 334 | "fih_size": f"0x{self.fih_size:X}" if isinstance(self.fih_size, int) else str(self.fih_size), |
| 335 | "pfs_offset": f"0x{self.pfs_offset:X}" if isinstance(self.pfs_offset, int) else str(self.pfs_offset), |
| 336 | "pfs_size": f"0x{self.pfs_size:X}" if isinstance(self.pfs_size, int) else str(self.pfs_size), |
| 337 | "sc_offset": f"0x{self.sc_offset:X}" if isinstance(self.sc_offset, int) else str(self.sc_offset), |
| 338 | "sc_size": f"0x{self.sc_size:X}" if isinstance(self.sc_size, int) else str(self.sc_size), |
| 339 | "si_offset": f"0x{self.si_offset:X}" if isinstance(self.si_offset, int) else str(self.si_offset), |
| 340 | "si_size": f"0x{self.si_size:X}" if isinstance(self.si_size, int) else str(self.si_size), |
| 341 | }) |
| 342 | return info |