import struct import gzip import io import zlib from dataclasses import dataclass from typing import BinaryIO, Iterator, Tuple, Optional MAGIC = b"V2A\0" VERSION = 2 @dataclass class V2AHeader: magic: bytes version: int frame_count: int original_width: int original_height: int fps: float audio_size: int padding: bytes @classmethod def read(cls, f: BinaryIO) -> "V2AHeader": magic = f.read(4) if magic != MAGIC: raise ValueError(f"Invalid magic: {magic!r}") version = struct.unpack(" None: f.write(self.magic) f.write(struct.pack(" "V2AFrame": import zlib d = zlib.decompressobj(wbits=31) decompressed = bytearray() chunk_size = 4096 while True: chunk = f.read(chunk_size) if not chunk: raise EOFError("End of file while reading gzip stream") try: decompressed.extend(d.decompress(chunk)) except zlib.error as e: raise ValueError(f"zlib decompression error: {e}") if d.eof: unused_data = d.unused_data if unused_data: f.seek(-len(unused_data), 1) if len(decompressed) < 4: raise ValueError(f"Decompressed data too short: {len(decompressed)}") width = struct.unpack(" 8192 * 1024: raise ValueError(f"Decompressed data too large ({len(decompressed)} > 8MB), likely corrupted data") def write_compressed(self, f: BinaryIO) -> None: with gzip.GzipFile(fileobj=f, mode='wb') as gz: gz.write(struct.pack(" Optional[V2AFrame]: if self.current_frame >= self.header.frame_count: return None try: frame = V2AFrame.read_compressed(self.file) self.current_frame += 1 return frame except EOFError: return None def frames(self) -> Iterator[V2AFrame]: while True: frame = self.read_frame() if frame is None: break yield frame def reset(self): self.file.seek(32 + self.header.audio_size) self.current_frame = 0 @property def frame_rate(self) -> float: return self.header.fps @property def original_dimensions(self) -> Tuple[int, int]: return (self.header.original_width, self.header.original_height) @property def frame_dimensions(self) -> Tuple[int, int]: pos = self.file.tell() self.file.seek(32 + self.header.audio_size) try: frame = V2AFrame.read_compressed(self.file) self.file.seek(pos) return (frame.width, frame.height) except Exception: self.file.seek(pos) raise @property def audio(self) -> bytes: return self.audio_data