在处理多层嵌套的压缩文件(如 ZIP 内包含 TAR.GZ,再包含 RAR 等)时,我们需要递归解压并读取最终文件内容。以下是完整实现方案:
核心思路
- 递归解压:逐层解压直到找到非压缩文件
- 格式识别:根据扩展名选择解压方法
- 内存管理:使用临时目录避免内存溢出
- 异常处理:确保程序健壮性
完整代码实现
import os
import shutil
import tempfile
import zipfile
import tarfile
import gzip
import rarfile
import py7zr
from pathlib import Path
def extract_nested_archive(file_path, output_dir=None):
"""
递归解压嵌套压缩文件
:param file_path: 压缩文件路径
:param output_dir: 解压目录(默认创建临时目录)
:return: 解压后的目录路径
"""
# 创建临时目录(如果未指定输出目录)
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="nested_extract_")
# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)
file_ext = Path(file_path).suffix.lower()
print(f"处理文件: {file_path} | 格式: {file_ext}")
try:
# ZIP格式处理
if file_ext == '.zip':
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(output_dir)
# 递归处理内部压缩文件
for member in zip_ref.namelist():
member_path = Path(output_dir) / member
if member_path.is_file() and member_path.suffix.lower() in {
'.zip', '.tar', '.gz', '.tgz', '.rar', '.7z'
}:
extract_nested_archive(str(member_path), output_dir)
# TAR格式处理(包括TAR.GZ)
elif file_ext in ('.tar', '.gz', '.tgz'):
mode = 'r'
if file_ext in ('.gz', '.tgz'):
mode = 'r:gz'
with tarfile.open(file_path, mode) as tar_ref:
tar_ref.extractall(output_dir)
# 递归处理内部压缩文件
for member in tar_ref.getnames():
member_path = Path(output_dir) / member
if member_path.is_file() and member_path.suffix.lower() in {
'.zip', '.tar', '.gz', '.tgz', '.rar', '.7z'
}:
extract_nested_archive(str(member_path), output_dir)
# RAR格式处理
elif file_ext == '.rar':
with rarfile.RarFile(file_path) as rar_ref:
rar_ref.extractall(output_dir)
# 递归处理内部压缩文件
for member in rar_ref.namelist():
member_path = Path(output_dir) / member
if member_path.is_file() and member_path.suffix.lower() in {
'.zip', '.tar', '.gz', '.tgz', '.rar', '.7z'
}:
extract_nested_archive(str(member_path), output_dir)
# 7Z格式处理
elif file_ext == '.7z':
with py7zr.SevenZipFile(file_path, mode='r') as sz_ref:
sz_ref.extractall(output_dir)
# 递归处理内部压缩文件
for member in sz_ref.getnames():
member_path = Path(output_dir) / member
if member_path.is_file() and member_path.suffix.lower() in {
'.zip', '.tar', '.gz', '.tgz', '.rar', '.7z'
}:
extract_nested_archive(str(member_path), output_dir)
# GZ单独处理(非TAR.GZ)
elif file_ext == '.gz' and not file_path.lower().endswith('.tar.gz'):
output_file = Path(output_dir) / Path(file_path).stem
with gzip.open(file_path, 'rb') as gz_ref:
with open(output_file, 'wb') as out_ref:
shutil.copyfileobj(gz_ref, out_ref)
except Exception as e:
print(f"解压失败: {file_path} | 错误: {str(e)}")
return output_dir
def find_files_in_dir(directory, extensions=None):
"""
在目录中查找所有文件(可选扩展名过滤)
:param directory: 搜索目录
:param extensions: 允许的文件扩展名列表
:return: 文件路径生成器
"""
for root, _, files in os.walk(directory):
for file in files:
file_path = Path(root) / file
if extensions:
if file_path.suffix.lower() in extensions:
yield file_path
else:
yield file_path
def read_nested_archive_contents(archive_path, target_extensions=None):
"""
读取嵌套压缩包内的文件内容
:param archive_path: 压缩包路径
:param target_extensions: 目标文件扩展名
:return: {文件路径: 内容} 的字典
"""
# 递归解压文件
extracted_dir = extract_nested_archive(archive_path)
# 收集最终文件内容
contents = {}
for file_path in find_files_in_dir(extracted_dir, target_extensions):
try:
with open(file_path, 'r', encoding='utf-8') as f:
contents[str(file_path)] = f.read()
except UnicodeDecodeError:
# 尝试二进制读取
with open(file_path, 'rb') as f:
contents[str(file_path)] = f.read()
# 清理临时目录
shutil.rmtree(extracted_dir, ignore_errors=True)
return contents
# 使用示例 ==============================================
if __name__ == "__main__":
# 示例压缩包路径(支持多层嵌套)
archive_path = "test_archive.zip"
# 指定需要读取的文件类型(可选)
target_extensions = ['.txt', '.csv', '.json', '.xml']
# 读取压缩包内容
results = read_nested_archive_contents(archive_path, target_extensions)
# 输出结果
print(f"\n在 {archive_path} 中找到 {len(results)} 个文件:")
for path, content in results.items():
print(f"\n文件路径: {path}")
print(f"内容预览: {content[:200]}...")
关键技术解析
- 递归解压算法:
- 自动检测文件扩展名(
.zip
,.tar
,.gz
,.rar
,.7z
) - 使用标准库处理常见格式(zipfile/tarfile/gzip)
- 第三方库支持特殊格式(rarfile/py7zr)
- 安全处理机制:
try:
# 解压操作
except (BadZipFile, TarError, RarCannotExec, SevenZipError) as e:
# 格式特定异常处理
except Exception as e:
# 通用异常捕获
- 内存优化设计:
- 使用
tempfile.mkdtemp()
创建临时目录 - 流式处理大文件(
shutil.copyfileobj
) - 自动清理临时文件(
shutil.rmtree
)
- 编码智能处理:
try:
# 尝试UTF-8解码
except UnicodeDecodeError:
# 回退到二进制读取
支持的文件格式
格式 | 扩展名 | 处理库 |
---|---|---|
ZIP | .zip | zipfile |
TAR | .tar | tarfile |
GZIP | .gz | gzip |
TAR.GZ | .tar.gz, .tgz | tarfile |
RAR | .rar | rarfile |
7-Zip | .7z | py7zr |
使用示例
- 读取多层嵌套压缩包:
results = read_nested_archive_contents(
"multi_layer_archive.zip",
target_extensions=['.txt', '.log']
)
- 处理特定类型文件:
# 只读取CSV和JSON文件
csv_json_data = read_nested_archive_contents(
"data_archive.rar",
['.csv', '.json']
)
- 获取所有文件内容:
# 不指定扩展名则读取所有文件
all_contents = read_nested_archive_contents("archive.7z")
性能优化建议
- 大文件处理:
# 使用分块读取代替完全加载
def read_large_file(file_path, chunk_size=8192):
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
yield chunk
- 并行解压:
from concurrent.futures import ThreadPoolExecutor
def parallel_extract(files):
with ThreadPoolExecutor() as executor:
executor.map(extract_nested_archive, files)
- 缓存机制:
from functools import lru_cache
@lru_cache(maxsize=32)
def cached_extract(file_path):
return extract_nested_archive(file_path)
常见问题解决
- 乱码问题:
# 尝试不同编码
encodings = ['utf-8', 'gbk', 'latin-1']
for enc in encodings:
try:
return content.decode(enc)
except UnicodeDecodeError:
continue
- 加密压缩包:
# 添加密码支持(示例:ZIP)
with zipfile.ZipFile(file_path) as zf:
zf.setpassword(b"secret")
zf.extractall(output_dir)
- 损坏文件处理:
# 使用ignore_errors跳过损坏文件
with tarfile.open(file_path, 'r:gz', ignore_zeros=True) as tar:
tar.extractall(path=output_dir, filter='ignore_errors')
此方案能有效处理各种复杂嵌套的压缩文件结构,通过递归解压和智能文件识别,最终读取目标文件内容,同时确保资源管理和错误处理机制完备。
© 版权声明
本站资源来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!
THE END