Python多线程复制 工具脚本
用于替代超慢的单线程cp -r,基本款:
import os
import shutil
from concurrent.futures import ThreadPoolExecutor
def copy_file(src, dest):
try:
shutil.copy2(src, dest)
print(f"{src} 复制到 {dest} 成功。")
except Exception as e:
print(f"复制文件 {src} 失败。错误:{e}")
def process_directory(src_folder, dest_folder, executor):
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
for item in os.listdir(src_folder):
src = os.path.join(src_folder, item)
dest = os.path.join(dest_folder, item)
if os.path.isdir(src):
process_directory(src, dest, executor)
else:
executor.submit(copy_file, src, dest)
def multi_threaded_copy(src_folder, dest_folder, threads=5):
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
with ThreadPoolExecutor(max_workers=threads) as executor:
process_directory(src_folder, dest_folder, executor)
if __name__ == "__main__":
source_folder = "目标目录" # 请替换为实际的目标目录
destination_folder = "指定目录" # 请替换为实际的指定目录
num_threads = 256 # 指定线程数量,可以根据需要调整
multi_threaded_copy(source_folder, destination_folder, num_threads)
添加tqdm进度条、使用logging代替print、添加跳过目录:
import os
import shutil
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import logging
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
print_logger_successful_message = False
print_logger_fail_message = True
print_logger_skip_message = True
def copy_file(src, dest):
try:
shutil.copy2(src, dest)
if print_logger_successful_message:
logger.info(f"{src} 复制到 {dest} 成功。")
except Exception as e:
if print_logger_fail_message:
logger.error(f"复制文件 {src} 失败。错误:{e}")
def process_directory(src_folder, dest_folder, executor, pbar, skip_paths):
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
for item in os.listdir(src_folder):
src = os.path.join(src_folder, item)
dest = os.path.join(dest_folder, item)
if src in skip_paths:
if print_logger_skip_message:
logger.info(f"跳过 {src}")
continue
if os.path.isdir(src):
process_directory(src, dest, executor, pbar, skip_paths)
else:
executor.submit(copy_file, src, dest)
pbar.update(1)
def count_files(src_folder, skip_paths):
total_files = 0
for root, _, files in os.walk(src_folder):
if root not in skip_paths:
total_files += len(files)
return total_files
def multi_threaded_copy(src_folder, dest_folder, threads=5, skip_paths=None):
if skip_paths is None:
skip_paths = []
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
total_files = count_files(src_folder, skip_paths)
with ThreadPoolExecutor(max_workers=threads) as executor:
with tqdm(total=total_files, desc="复制文件", unit="file") as pbar:
process_directory(src_folder, dest_folder, executor, pbar, skip_paths)
if __name__ == "__main__":
source_folder = "" # 请替换为实际的目标目录
destination_folder = "" # 请替换为实际的指定目录,请带上最后的目录名
num_threads = 256 # 指定线程数量,可以根据需要调整
print_logger_successful_message = False # 是否打印成功的日志信息
print_logger_fail_message = True # 是否打印失败的日志信息
print_logger_skip_message = True # 是否打印跳过的日志信息
# 将要跳过的路径添加到此数组中
skip_paths = []
multi_threaded_copy(source_folder, destination_folder, num_threads, skip_paths)