배경 및 목적
- NAS로 파일을 백업하는 과정에서 파일명이 길어서 수정해야 하는 경우가 너무 많아서, 자동화 코드를 작성
감안한 부분
- 대량의 파일을 처리할 것을 감안해서, 파일 복사 대신 경로만 저장해서 컨트롤한 이후에 최종적으로 필요한 파일만 일괄적으로 처리
- Python의
concurrent.futures
모듈을 사용하여 병렬로 파일을 처리
샘플 코드
import os
import shutil
from concurrent.futures import ThreadPoolExecutor
def process_file(directory, filename, output_directory, delimiter):
"""
파일명을 변경하고 output 디렉토리에 복사하는 작업을 수행하는 함수.
""" # 파일명과 확장자를 분리
file_base, file_ext = os.path.splitext(filename)
# Delimiter가 처음 등장한 위치 찾기
cut_position = file_base.find(delimiter)
# Delimiter가 파일명에 존재하는 경우, 해당 위치까지 자름
if cut_position != -1:
new_filename = file_base[:cut_position] + file_ext
else:
new_filename = filename
# 기존 파일 경로와 새 파일 경로 설정
old_filepath = os.path.join(directory, filename)
new_filepath = os.path.join(output_directory, new_filename)
# 파일을 output 디렉토리로 복사
shutil.copy2(old_filepath, new_filepath)
def trim_filename(directory, extensions, delimiter="_", max_workers=8):
"""
지정된 디렉토리에서 특정 확장자의 파일을 찾고,
Delimiter가 처음 등장한 지점까지 파일명을 잘라서 output 디렉토리에 복사하는 함수.
Parameters: directory (str): 파일을 검색할 디렉토리 경로
extensions (list): 검색할 파일의 확장자 리스트 (예: ['.txt', '.csv'])
delimiter (str): 파일명을 자를 기준이 되는 Delimiter (기본값: "_")
max_workers (int): 동시에 작업할 최대 스레드 수 (기본값: 8)
Returns: None """ # output 디렉토리 경로 생성
output_directory = os.path.join(directory, "output")
os.makedirs(output_directory, exist_ok=True)
# 처리할 파일 리스트 작성
files_to_process = [filename for filename in os.listdir(directory) if any(filename.endswith(ext) for ext in extensions)]
# 스레드를 사용하여 파일 처리
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for filename in files_to_process:
executor.submit(process_file, directory, filename, output_directory, delimiter)