💻 Dev

💻 오늘의 코드 팁 — Temporal로 절대 실패하지 않는 워크플로우 만들기

문제 상황


주문 처리 시스템에서 결제 → 재고 차감 → 배송 요청을 순서대로 실행해야 합니다.
중간에 서버가 죽으면? 재고는 빠졌는데 배송이 안 되는 최악의 상황이 발생합니다.
`try/except`로 롤백 로직을 짜면 코드가 스파게티가 되고, cron + DB 상태 체크는 관리 지옥입니다.
Temporal은 이 문제를 워크플로우 엔진으로 해결합니다. 서버가 죽어도 정확히 멈춘 지점부터 자동 재개됩니다.
---

설치


```bash
pip install temporalio
```
Temporal 서버 (로컬 개발용):
```bash
brew install temporal
temporal server start-dev
```
---

코드 예제


1. Activity 정의 (실제 비즈니스 로직)


```python
# activities.py
from temporalio import activity
from dataclasses import dataclass
@dataclass
class OrderInfo:
order_id: str
amount: int
item_id: str
@activity.defn
async def process_payment(order: OrderInfo) -> str:
"""결제 처리 — 외부 PG 호출"""
print(f"결제 처리 중: {order.order_id}, {order.amount}원")
# 실제로는 PG API 호출
return f"PAY-{order.order_id}"
@activity.defn
async def deduct_inventory(order: OrderInfo) -> bool:
"""재고 차감"""
print(f"재고 차감: {order.item_id}")
# 실제로는 DB 업데이트
return True
@activity.defn
async def request_shipping(order: OrderInfo) -> str:
"""배송 요청"""
print(f"배송 요청: {order.order_id}")
return f"SHIP-{order.order_id}"
@activity.defn
async def refund_payment(payment_id: str) -> bool:
"""결제 취소 (보상 트랜잭션)"""
print(f"환불 처리: {payment_id}")
return True
```

2. Workflow 정의 (오케스트레이션)


```python
# workflows.py
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import (
OrderInfo, process_payment, deduct_inventory,
request_shipping, refund_payment,
)
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order: OrderInfo) -> str:
# 1단계: 결제
payment_id = await workflow.execute_activity(
process_payment,
order,
start_to_close_timeout=timedelta(seconds=30),
)
# 2단계: 재고 차감 (실패 시 자동 환불)
try:
await workflow.execute_activity(
deduct_inventory,
order,
start_to_close_timeout=timedelta(seconds=10),
)
except Exception:
await workflow.execute_activity(
refund_payment,
payment_id,
start_to_close_timeout=timedelta(seconds=30),
)
return "FAILED: 재고 부족 → 환불 완료"
# 3단계: 배송 요청
shipping_id = await workflow.execute_activity(
request_shipping,
order,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=workflow.RetryPolicy(
maximum_attempts=3,
),
)
return f"완료! 결제: {payment_id}, 배송: {shipping_id}"
```

3. Worker + Client 실행


```python
# run_worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflows import OrderWorkflow
from activities import (
process_payment, deduct_inventory,
request_shipping, refund_payment,
)
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="order-queue",
workflows=[OrderWorkflow],
activities=[
process_payment, deduct_inventory,
request_shipping, refund_payment,
],
)
print("Worker 시작! 주문 대기 중...")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
```
```python
# start_order.py
import asyncio
from temporalio.client import Client
from activities import OrderInfo
from workflows import OrderWorkflow
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
OrderWorkflow.run,
OrderInfo(order_id="ORD-001", amount=29900, item_id="ITEM-42"),
id="order-ORD-001",
task_queue="order-queue",
)
print(f"결과: {result}")
if __name__ == "__main__":
asyncio.run(main())
```
---

핵심 포인트


| 특징 | 설명 |
|------|------|
| 자동 재시도 | Activity 실패 시 RetryPolicy에 따라 자동 재시도 |
| 내구성 | 서버 재시작해도 워크플로우가 멈춘 지점부터 재개 |
| 보상 트랜잭션 | try/except로 자연스럽게 롤백 로직 작성 |
| 가시성 | `localhost:8233` 웹 UI에서 모든 워크플로우 상태 실시간 확인 |
기존 방식(cron + DB 플래그)과 비교하면 코드량이 절반 이하로 줄고, 장애 복구를 프레임워크가 알아서 해줍니다.
---
📚 공식 문서: [Temporal Python SDK](https://docs.temporal.io/develop/python) | [Getting Started](https://learn.temporal.io/getting_started/python/) | [GitHub](https://github.com/temporalio/sdk-python)
💬 0
👁 0 views

Comments (0)

💬

No comments yet.

Be the first to comment!