Python多线程复制 工具脚本

用于替代超慢的单线程cp -r,基本款:

import os
import shutil
from concurrent.futures import ThreadPoolExecutor

def copy_file(src, dest):
    try:
        shutil.copy2(src, dest)
        print(f"{src} 复制到 {dest} 成功。")
    except Exception as e:
        print(f"复制文件 {src} 失败。错误:{e}")

def process_directory(src_folder, dest_folder, executor):
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    for item in os.listdir(src_folder):
        src = os.path.join(src_folder, item)
        dest = os.path.join(dest_folder, item)

        if os.path.isdir(src):
            process_directory(src, dest, executor)
        else:
            executor.submit(copy_file, src, dest)

def multi_threaded_copy(src_folder, dest_folder, threads=5):
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    with ThreadPoolExecutor(max_workers=threads) as executor:
        process_directory(src_folder, dest_folder, executor)

if __name__ == "__main__":
    source_folder = "目标目录" # 请替换为实际的目标目录
    destination_folder = "指定目录" # 请替换为实际的指定目录
    num_threads = 256 # 指定线程数量,可以根据需要调整

    multi_threaded_copy(source_folder, destination_folder, num_threads)

添加tqdm进度条、使用logging代替print、添加跳过目录:

import os
import shutil
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import logging

logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
print_logger_successful_message = False
print_logger_fail_message = True
print_logger_skip_message = True

def copy_file(src, dest):
    try:
        shutil.copy2(src, dest)
        if print_logger_successful_message:
            logger.info(f"{src} 复制到 {dest} 成功。")
    except Exception as e:
        if print_logger_fail_message:
            logger.error(f"复制文件 {src} 失败。错误:{e}")

def process_directory(src_folder, dest_folder, executor, pbar, skip_paths):
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    for item in os.listdir(src_folder):
        src = os.path.join(src_folder, item)
        dest = os.path.join(dest_folder, item)

        if src in skip_paths:
            if print_logger_skip_message:
                logger.info(f"跳过 {src}")
            continue

        if os.path.isdir(src):
            process_directory(src, dest, executor, pbar, skip_paths)
        else:
            executor.submit(copy_file, src, dest)
            pbar.update(1)

def count_files(src_folder, skip_paths):
    total_files = 0
    for root, _, files in os.walk(src_folder):
        if root not in skip_paths:
            total_files += len(files)
    return total_files

def multi_threaded_copy(src_folder, dest_folder, threads=5, skip_paths=None):
    if skip_paths is None:
        skip_paths = []

    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    total_files = count_files(src_folder, skip_paths)

    with ThreadPoolExecutor(max_workers=threads) as executor:
        with tqdm(total=total_files, desc="复制文件", unit="file") as pbar:
            process_directory(src_folder, dest_folder, executor, pbar, skip_paths)

if __name__ == "__main__":
    source_folder = "" # 请替换为实际的目标目录
    destination_folder = "" # 请替换为实际的指定目录,请带上最后的目录名
    num_threads = 256 # 指定线程数量,可以根据需要调整
    print_logger_successful_message = False  # 是否打印成功的日志信息
    print_logger_fail_message = True # 是否打印失败的日志信息
    print_logger_skip_message = True # 是否打印跳过的日志信息
    # 将要跳过的路径添加到此数组中
    skip_paths = []

    multi_threaded_copy(source_folder, destination_folder, num_threads, skip_paths)

发表回复