Toolbox for analyzing and editing pkg application files for psp,ps3, ps4 and ps5, includes the most useful functions you might need.
| 1 | import os |
| 2 | import struct |
| 3 | import io |
| 4 | import json |
| 5 | import re |
| 6 | import unicodedata |
| 7 | from PIL import Image |
| 8 | from .enums import DRMType, ContentType, IROTag |
| 9 | from Utilities import Logger |
| 10 | |
| 11 | class PackageBase: |
| 12 | TYPE_MASK = 0x0000FFFF |
| 13 | FLAG_RETAIL = 1 << 31 |
| 14 | FLAG_ENCRYPTED = 0x80000000 |
| 15 | |
| 16 | def __init__(self, file: str): |
| 17 | if not os.path.isfile(file): |
| 18 | raise FileNotFoundError(f"The PKG file '{file}' does not exist.") |
| 19 | |
| 20 | self.original_file = file |
| 21 | self.pkg_info = {} |
| 22 | self.files = {} |
| 23 | self.content_id = None |
| 24 | self.drm_type = None |
| 25 | self.content_type = None |
| 26 | self.content_flags = None |
| 27 | self.iro_tag = None |
| 28 | self.version_date = None |
| 29 | self.version_hash = None |
| 30 | self.digest_table_hash = None |
| 31 | self.entry_table_offset = None |
| 32 | self.entry_table_size = None |
| 33 | |
| 34 | def _safe_decode(self, data): |
| 35 | if isinstance(data, str): |
| 36 | return data.rstrip('\x00') |
| 37 | elif isinstance(data, bytes): |
| 38 | try: |
| 39 | return data.decode('utf-8', errors='ignore').rstrip('\x00') |
| 40 | except UnicodeDecodeError: |
| 41 | return data.decode('latin-1', errors='ignore').rstrip('\x00') |
| 42 | elif isinstance(data, int): |
| 43 | return str(data) |
| 44 | else: |
| 45 | return str(data) |
| 46 | |
| 47 | def _read_null_terminated_string(self, fp): |
| 48 | result = bytearray() |
| 49 | while True: |
| 50 | try: |
| 51 | char = fp.read(1) |
| 52 | if char == b'\x00' or len(char) == 0: |
| 53 | break |
| 54 | result.extend(char) |
| 55 | except (OverflowError, OSError) as e: |
| 56 | Logger.log_warning(f"Error reading string: {e}") |
| 57 | break |
| 58 | return bytes(result) |
| 59 | |
| 60 | def get_info(self): |
| 61 | return { |
| 62 | "content_id": self.content_id, |
| 63 | "drm_type": self.drm_type, |
| 64 | "content_type": self.content_type, |
| 65 | "content_flags": self.content_flags, |
| 66 | "iro_tag": self.iro_tag, |
| 67 | "version_date": self.version_date, |
| 68 | "version_hash": self.version_hash, |
| 69 | "digest_table_hash": self.digest_table_hash, |
| 70 | "entry_table_offset": self.entry_table_offset, |
| 71 | "entry_table_size": self.entry_table_size, |
| 72 | } |
| 73 | |
| 74 | def read_file(self, file_id): |
| 75 | file_info = self.files.get(file_id) |
| 76 | if not file_info: |
| 77 | raise ValueError(f"File with ID {file_id} not found in the package.") |
| 78 | |
| 79 | with open(self.original_file, 'rb') as f: |
| 80 | f.seek(file_info['offset']) |
| 81 | data = f.read(file_info['size']) |
| 82 | |
| 83 | return data |
| 84 | |
| 85 | def extract_all_files(self, output_dir: str): |
| 86 | """Extract all files listed in self.files to the specified output directory. |
| 87 | |
| 88 | This is a generic implementation used by package types. It expects |
| 89 | `self.files` to be a mapping of file_id -> dict with at least: |
| 90 | - 'offset': byte offset in the source package |
| 91 | - 'size': size in bytes |
| 92 | - optional 'name': output relative path |
| 93 | """ |
| 94 | try: |
| 95 | os.makedirs(output_dir, exist_ok=True) |
| 96 | |
| 97 | if not isinstance(self.files, dict) or not self.files: |
| 98 | Logger.log_warning("No files table available for extraction.") |
| 99 | return f"No files to extract. Output: {output_dir}" |
| 100 | |
| 101 | with open(self.original_file, 'rb') as src: |
| 102 | for file_id, info in self.files.items(): |
| 103 | try: |
| 104 | name = info.get('name', f'file_{file_id}') |
| 105 | # Normalize any path-like names to avoid traversal |
| 106 | safe_name = os.path.normpath(name).lstrip(os.sep).replace('..', '_') |
| 107 | out_path = os.path.join(output_dir, safe_name) |
| 108 | os.makedirs(os.path.dirname(out_path), exist_ok=True) |
| 109 | |
| 110 | offset = info.get('offset') |
| 111 | size = info.get('size') |
| 112 | if offset is None or size is None: |
| 113 | Logger.log_warning(f"Skipping file_id {file_id}: missing offset/size") |
| 114 | continue |
| 115 | |
| 116 | src.seek(offset) |
| 117 | chunk = src.read(size) |
| 118 | with open(out_path, 'wb') as dst: |
| 119 | dst.write(chunk) |
| 120 | Logger.log_information(f"Extracted: {safe_name}") |
| 121 | except Exception as e: |
| 122 | Logger.log_error(f"Error extracting file_id {file_id}: {e}") |
| 123 | |
| 124 | Logger.log_information(f"Extraction completed. Output: {output_dir}") |
| 125 | return f"Extraction completed. Output: {output_dir}" |
| 126 | except Exception as e: |
| 127 | Logger.log_error(f"Extraction failed: {e}") |
| 128 | raise |
| 129 |