django-celery-expert
专注于Django与Celery集成,处理事务安全、ORM模式、测试和请求等任务,提供专业指导。
npx skills add vintasoftware/django-ai-plugins --skill django-celery-expertBefore / After 效果对比
1 组1在没有 Celery 集成时,Django 应用程序中的耗时操作(如发送邮件、处理图片、生成报告)通常会在 Web 请求-响应周期内同步执行。这会导致用户界面卡顿、请求超时,严重影响用户体验和服务器的响应能力。
2```python
3# 同步执行的耗时操作
4def send_welcome_email(user):
5 # 模拟耗时操作
6 time.sleep(5)
7 print(f"Sending email to {user.email}")
8
9def register_user(request):
10 user = User.objects.create(username=request.POST['username'])
11 send_welcome_email(user) # 同步调用
12 return HttpResponse("User registered")
13```1使用 `django-celery-expert` 技能后,可以将耗时操作异步化,将其委托给 Celery 后台任务队列处理。这使得 Web 应用程序能够立即响应用户请求,同时后台任务在不阻塞主线程的情况下执行,显著提升了用户体验和系统吞吐量。该技能还提供了事务安全、ORM 模式和测试等最佳实践。
2```python
3# 异步执行的耗时操作
4from celery import shared_task
5import time
6
7@shared_task
8def send_welcome_email_async(user_id):
9 user = User.objects.get(id=user_id)
10 time.sleep(5)
11 print(f"Sending email to {user.email}")
12
13def register_user(request):
14 user = User.objects.create(username=request.POST['username'])
15 send_welcome_email_async.delay(user.id) # 异步调用
16 return HttpResponse("User registered")
17```description SKILL.md
django-celery-expert
Django Celery Expert Instructions Step 1: Classify the Request Identify the task category from the request: Django integration — transaction safety, ORM patterns, testing, request correlation → read references/django-integration.md Task design — new tasks, calling patterns, chains/groups/chords, idempotency → read references/task-design-patterns.md Configuration — broker setup, result backend, worker settings, queue routing → read references/configuration-guide.md Error handling — retries, backoff, dead letter queues, timeouts → read references/error-handling.md Periodic tasks — Celery Beat, crontab schedules, dynamic schedules, timezone handling → read references/periodic-tasks.md Monitoring — Flower, Prometheus, logging, debugging stuck tasks → read references/monitoring-observability.md Production deployment — scaling, supervision, containers, health checks → read references/production-deployment.md If the request spans multiple categories, read all relevant reference files before continuing. Step 2: Read the Reference File(s) Read each reference file identified in Step 1. Do not proceed to implementation without reading the relevant reference. Step 3: Implement Apply the patterns from the reference file. Before presenting the solution, verify: Task arguments are serializable (pass IDs, not model instances) Tasks with retries enabled are idempotent Errors are logged with context Long-running tasks have timeouts configured Examples Basic Background Task Request: "Send welcome emails in the background after user registration" # tasks.py from celery import shared_task from django.core.mail import send_mail @shared_task(bind=True, max_retries=3) def send_welcome_email(self, user_id): from users.models import User try: user = User.objects.get(id=user_id) send_mail( subject="Welcome!", message=f"Hi {user.name}, welcome to our platform!", from_email="noreply@example.com", recipient_list=[user.email], ) except User.DoesNotExist: pass except Exception as exc: raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries)) # views.py — queue only after the transaction commits from django.db import transaction def register(request): user = User.objects.create(...) transaction.on_commit(lambda: send_welcome_email.delay(user.id)) return redirect("dashboard") Task with Progress Tracking Request: "Process a large CSV import with progress updates" @shared_task(bind=True) def import_csv(self, file_path, total_rows): from myapp.models import Record with open(file_path) as f: reader = csv.DictReader(f) for i, row in enumerate(reader): Record.objects.create(**row) if i % 100 == 0: self.update_state( state="PROGRESS", meta={"current": i, "total": total_rows}, ) return {"status": "complete", "processed": total_rows} # Poll progress result = import_csv.AsyncResult(task_id) if result.state == "PROGRESS": progress = result.info.get("current", 0) / result.info.get("total", 1) Workflow with Chains Request: "Process an order: validate inventory, charge payment, then send confirmation" from celery import chain @shared_task def validate_inventory(order_id): order = Order.objects.get(id=order_id) if not order.items_in_stock(): raise ValueError("Items out of stock") return order_id @shared_task def charge_payment(order_id): order = Order.objects.get(id=order_id) order.charge() return order_id @shared_task def send_confirmation(order_id): Order.objects.get(id=order_id).send_confirmation_email() def process_order(order_id): chain( validate_inventory.s(order_id), charge_payment.s(), send_confirmation.s(), ).delay() Weekly Installs290Repositoryvintasoftware/d…-pluginsGitHub Stars25First SeenJan 21, 2026Security AuditsGen Agent Trust HubPassSocketPassSnykPassInstalled onopencode231codex224gemini-cli224github-copilot215cursor177amp177
forum用户评价 (0)
发表评价
暂无评价,来写第一条吧
统计数据
用户评分
为此 Skill 评分