batch-processor
数百のドキュメントを効率的に一括処理し、フォーマット変換、データ抽出、分析に対応し、並列実行と進捗追跡を提供します。
npx skills add https://github.com/claude-office-skills/skills --skill batch-processorBefore / After 効果比較
1 组ファイルを一つずつ開き、手動で形式を変換し、データをコピーし、結果を記録する。500件のドキュメント処理に2~3日かかります。
ドキュメントの並行バッチ処理、自動変換と抽出、リアルタイム進捗追跡により、500件のドキュメントを4時間で処理完了。
batch-processor
Batch Processor Skill
Overview
This skill enables efficient bulk processing of documents - convert, transform, extract, or analyze hundreds of files with parallel execution and progress tracking.
How to Use
-
Describe what you want to accomplish
-
Provide any required input data or files
-
I'll execute the appropriate operations
Example prompts:
-
"Convert 100 PDFs to Word documents"
-
"Extract text from all images in a folder"
-
"Batch rename and organize files"
-
"Mass update document headers/footers"
Domain Knowledge
Batch Processing Patterns
Input: [file1, file2, ..., fileN]
│
▼
┌─────────────┐
│ Parallel │ ← Process multiple files concurrently
│ Workers │
└─────────────┘
│
▼
Output: [result1, result2, ..., resultN]
Python Implementation
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
def process_file(file_path: Path) -> dict:
"""Process a single file."""
# Your processing logic here
return {"path": str(file_path), "status": "success"}
def batch_process(input_dir: str, pattern: str = "*.*", max_workers: int = 4):
"""Process all matching files in directory."""
files = list(Path(input_dir).glob(pattern))
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_file, f): f for f in files}
for future in tqdm(as_completed(futures), total=len(files)):
file = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({"path": str(file), "error": str(e)})
return results
# Usage
results = batch_process("/documents/invoices", "*.pdf", max_workers=8)
print(f"Processed {len(results)} files")
Error Handling & Resume
import json
from pathlib import Path
class BatchProcessor:
def __init__(self, checkpoint_file: str = "checkpoint.json"):
self.checkpoint_file = checkpoint_file
self.processed = self._load_checkpoint()
def _load_checkpoint(self):
if Path(self.checkpoint_file).exists():
return json.load(open(self.checkpoint_file))
return {}
def _save_checkpoint(self):
json.dump(self.processed, open(self.checkpoint_file, "w"))
def process(self, files: list, processor_func):
for file in files:
if str(file) in self.processed:
continue # Skip already processed
try:
result = processor_func(file)
self.processed[str(file)] = {"status": "success", **result}
except Exception as e:
self.processed[str(file)] = {"status": "error", "error": str(e)}
self._save_checkpoint() # Resume-safe
Best Practices
-
Use progress bars (tqdm) for user feedback
-
Implement checkpointing for long jobs
-
Set reasonable worker counts (CPU cores)
-
Log failures for later review
Installation
# Install required dependencies
pip install python-docx openpyxl python-pptx reportlab jinja2
Resources
Weekly Installs215Repositoryclaude-office-s…s/skillsGitHub Stars19First Seen11 days agoSecurity AuditsGen Agent Trust HubPassSocketPassSnykPassInstalled onclaude-code170opencode93github-copilot92gemini-cli90codex90amp90
ユーザーレビュー (0)
レビューを書く
レビューなし
統計データ
ユーザー評価
この Skill を評価