Luyện Phỏng Vấn IT — 2000+ Câu Hỏi Phỏng Vấn IT Có Đáp Án 2026
Python
Immutable (không thể thay đổi sau khi tạo): int, float, str, tuple, frozenset, bytes.
- Mutable (có thể thay đổi tại chỗ):
list,dict,set,bytearray.
Pitfall: Không dùng mutable làm default argument — def func(lst=[]) sẽ share cùng list giữa các lần gọi.
Dùng list khi cần ordered collection có thể thay đổi.
- Dùng
tuplecho dữ liệu bất biến, làm dict key, trả về nhiều giá trị từ function. - Dùng
setđể loại bỏ duplicate và kiểm tra membership O(1). - Dùng
dictcho key-value lookup O(1).
Pitfall: set và dict không ordered trước Python 3.7; dict từ Python 3.7+ giữ insertion order.
- try — khối code có thể raise exception
- except — xử lý exception
- else — chạy nếu KHÔNG có exception trong try
- finally — LUÔN chạy, dù có exception hay không — dùng để cleanup
try:
result = 10 / divisor
except ZeroDivisionError as e:
print(f"Error: {e}")
except (TypeError, ValueError):
print("Type or value error")
else:
print(f"Success: {result}") # Chỉ khi không có exception
finally:
print("Cleanup") # Luôn chạyDùng f-string (Python 3.6+) — nhanh nhất, dễ đọc nhất, hỗ trợ expression trực tiếp. f"Name: {name!r}, Score: {score:.2f}". .format() khi cần template string tái sử dụng. % formatting là cũ, không nên dùng trong code mới.
Ví dụ: f"2+2={2+2}" in ra "2+2=4" — expression được evaluate ngay trong string. Python 3.12+ (PEP 701): f-string hỗ trợ nested quotes và multi-line expression — f"result: {'pass' if ok else 'fail'}" là hợp lệ.
FastAPI là modern Python web framework dựa trên Starlette (ASGI) + Pydantic.
Ưu điểm:
- Async native — ~20K RPS, Flask/Django ~4-5K RPS
- Auto OpenAPI docs — Swagger UI + ReDoc tự động từ type hints
- Type-based validation — Pydantic validate request/response
- Dependency Injection built-in
- ASGI hỗ trợ WebSocket, HTTP/2. Dùng FastAPI cho: microservices, AI/ML serving, real-time APIs. Django vẫn tốt cho enterprise apps với admin panel và nhiều features built-in
Type hints (PEP 484, Python 3.5+) không enforce lúc runtime nhưng mang lại:
- IDE autocompletion và refactoring tốt hơn
- Static analysis với mypy/pyright bắt lỗi trước runtime
- Documentation tự động — code tự giải thích
- Pydantic/FastAPI dùng type hints để validation
- Ít bugs hơn trong team lớn
Pitfall: Type hints không prevent runtime errors — cần mypy --strict hoặc pyright trong CI pipeline.
Các built-ins quan trọng để xử lý collections hiệu quả:
# enumerate — loop có index
fruits = ["apple", "banana", "cherry"]
for i, fruit in enumerate(fruits, start=1):
print(f"{i}. {fruit}")
# zip — gộp nhiều iterables song song
names = ["Alice", "Bob"]
scores = [95, 87]
for name, score in zip(names, scores):
print(f"{name}: {score}")
# map — transform mỗi element (lazy)
doubled = list(map(lambda x: x * 2, [1, 2, 3])) # [2, 4, 6]
# Prefer list comprehension: [x*2 for x in [1,2,3]]
# filter — lọc elements (lazy)
evens = list(filter(lambda x: x % 2 == 0, range(10)))
# Prefer: [x for x in range(10) if x % 2 == 0]Pitfall: map và filter trả về lazy iterators — cần list() để materialize.
Modern Python ưu tiên list comprehension hơn map/filter vì dễ đọc hơn.
Python dùng hai cơ chế:
- Reference counting — mỗi object đếm số tham chiếu, về 0 thì giải phóng ngay
- Garbage Collector (module
gc) — phát hiện và thu hồi circular references mà reference counting không xử lý được - Memory Pool (PyMalloc) — tối ưu allocation cho objects nhỏ < 512 bytes
Pitfall: Circular reference giữa hai objects sẽ không bao giờ được reference counting giải phóng.
Generator là function dùng yield trả về iterator — tính giá trị lazily, từng phần tử khi được yêu cầu.
Tiết kiệm bộ nhớ so với list vì không nạp toàn bộ dữ liệu vào RAM.
# List — toàn bộ vào RAM
squares_list = [x**2 for x in range(1_000_000)]
# Generator — lazy, từng phần tử khi cần
def squares_gen(n):
for x in range(n):
yield x**2
gen = squares_gen(1_000_000)
print(next(gen)) # 0 — chỉ tính khi gọi next()Dùng khi: xử lý file lớn, streaming data, infinite sequences.
Decorator là higher-order function nhận một function và trả về function mới với hành vi bổ sung — cú pháp @decorator là syntactic sugar.
Dùng @functools.wraps để giữ metadata của function gốc.
import functools, time
def timer(func):
@functools.wraps(func) # Giữ __name__, __doc__
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
print(f"{func.__name__}: {time.perf_counter()-start:.4f}s")
return result
return wrapper
@timer
def slow():
time.sleep(1)Pitfall: Không dùng @functools.wraps → mất __name__, __doc__ của function gốc.
Context manager quản lý tài nguyên tự động qua __enter__ và __exit__. with đảm bảo cleanup chạy dù có exception hay không.
from contextlib import asynccontextmanager
# Dùng @asynccontextmanager cho async code (không phải @contextmanager)
@asynccontextmanager
async def db_transaction(session):
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Class-based (sync)
class Timer:
def __enter__(self):
import time; self.start = time.perf_counter(); return self
def __exit__(self, *args):
self.elapsed = time.perf_counter() - self.startClosure là nested function "ghi nhớ" biến từ enclosing scope dù outer function đã kết thúc.
Lambda là anonymous function một dòng — thường dùng cho logic đơn giản tức thời.
# Closure — nhớ state
def make_counter(start=0):
count = [start]
def counter():
count[0] += 1
return count[0]
return counter
c = make_counter()
print(c(), c(), c()) # 1, 2, 3
# Lambda — function tức thời
double = lambda x: x * 2Pitfall: Closure trong vòng lặp — mọi closure đều tham chiếu cùng biến vòng lặp, không phải giá trị tại thời điểm tạo.
Shallow copy tạo object mới nhưng elements bên trong vẫn tham chiếu cùng objects gốc — thay đổi mutable elements sẽ ảnh hưởng cả hai bản.
- Deep copy tạo hoàn toàn độc lập, copy đệ quy toàn bộ.
- Dùng
copy.copy()cho shallow vàcopy.deepcopy()cho deep.
Pitfall: list.copy(), list[:], dict.copy() đều là shallow copy.
== so sánh giá trị (gọi __eq__). is so sánh identity — hai biến có trỏ đến cùng object trong memory không.
Pitfall: CPython cache integers nhỏ (-5 đến 256) và string interning — x = 256; y = 256; x is y có thể là True nhưng x = 257; y = 257; x is y có thể là False.
- Chỉ dùng
isđể so sánh vớiNone,True,False.
lru_cache (Least Recently Used cache) là decorator memoize kết quả của function — lần sau gọi cùng args sẽ trả về cache thay vì tính lại.
Function phải có hashable arguments.
from functools import lru_cache
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
if n < 2: return n
return fibonacci(n-1) + fibonacci(n-2)
# Không cache (default: cached forever)
@lru_cache(maxsize=None) # Cache không giới hạn
def expensive_lookup(key): ...
fibonacci.cache_info() # hits, misses, maxsize, currsize
fibonacci.cache_clear() # Xóa cacheDùng cho: pure functions với expensive computation, recursive algorithms.
- Encapsulation — đóng gói data + methods, dùng
_(protected) và__(name mangling) - Inheritance —
class Child(Parent), hỗ trợ multiple inheritance, dùngsuper() - Polymorphism — cùng method tên, hành vi khác nhau tùy class; duck typing
- Abstraction — ẩn implementation qua
ABC+@abstractmethod
Pitfall: Python không có private thật sự — __attr chỉ là name mangling, vẫn access được qua _ClassName__attr.
Instance method nhận self — có access vào instance và class. @classmethod nhận cls — access class, thường dùng làm factory. @staticmethod không nhận self hay cls — utility function liên quan đến class.
class User:
_count = 0
def __init__(self, name):
self.name = name
User._count += 1
def greet(self): # Instance method
return f"Hi, {self.name}"
@classmethod
def from_dict(cls, data): # Factory — dùng cls thay vì User
return cls(data['name'])
@staticmethod
def validate_name(name): # Utility — không cần self/cls
return len(name) >= 2@property cho phép define getter/setter/deleter cho attribute, truy cập như attribute thường nhưng có validation logic.
class Temperature:
def __init__(self, celsius: float):
self._celsius = celsius
@property
def celsius(self) -> float:
return self._celsius
@celsius.setter
def celsius(self, value: float):
if value < -273.15:
raise ValueError("Below absolute zero!")
self._celsius = value
@property
def fahrenheit(self) -> float: # Read-only derived property
return self._celsius * 9/5 + 32
t = Temperature(25)
t.celsius = 30 # Gọi setter tự động
print(t.fahrenheit) # 86.0Dùng ABC + @abstractmethod để tạo abstract class.
Class con bắt buộc implement tất cả abstract methods, nếu không sẽ không instantiate được.
from abc import ABC, abstractmethod
class Repository(ABC):
@abstractmethod
async def get_by_id(self, id: int): ...
@abstractmethod
async def create(self, data: dict): ...
def find_all(self): # Concrete method — có thể kế thừa
return self.get_all()
class UserRepository(Repository):
async def get_by_id(self, id): ... # Bắt buộc implement
async def create(self, data): ... # Bắt buộc implement
# Repository() # TypeError — cannot instantiate abstract class@dataclass tự động generate __init__, __repr__, __eq__ — giảm boilerplate đáng kể.
from dataclasses import dataclass, field
from typing import List
@dataclass
class User:
name: str
email: str
age: int = 0
tags: List[str] = field(default_factory=list)
# __init__, __repr__, __eq__ tự động được generate!
@dataclass(frozen=True) # Immutable như tuple
class Point:
x: float
y: float
p = Point(1.0, 2.0)
# p.x = 3.0 # FrozenInstanceErrorƯu điểm so với namedtuple: có default values, methods, mutable (nếu không frozen).
__repr__— developer repr (eval-safe),__str__— user-friendly string__len__,__getitem__,__setitem__— sequence protocol__enter__/__exit__— context manager__eq__,__lt__,__hash__— comparison và hashability__call__— cho phép gọi instance như function__iter__/__next__— iterator protocol
Pitfall: Khi override __eq__, Python tự động set __hash__ = None — phải define __hash__ thủ công nếu muốn dùng object trong set/dict.
GIL (Global Interpreter Lock) là mutex trong CPython đảm bảo chỉ một thread thực thi Python bytecode tại một thời điểm. Ảnh hưởng:
- I/O-bound tasks — threading vẫn hiệu quả vì GIL được release khi chờ I/O (network, file, DB)
- CPU-bound tasks — threading KHÔNG cải thiện do GIL → phải dùng
multiprocessingđể bypass GIL và chạy song song thực sự
Pitfall: GIL không có trong Jython và IronPython.
Python 3.13 ra mắt free-threaded build chính thức (python3.13t, PEP 703) — GIL tắt per-interpreter; đây là opt-in feature ổn định, không còn là "experimental".
Dự kiến broader adoption trong 3.14+.
Dùng asyncio cho I/O-bound với nhiều concurrent operations (web APIs, DB queries) — single thread, cooperative concurrency, overhead thấp nhất.
- Dùng
threadingcho I/O-bound với thư viện blocking không hỗ trợ async. - Dùng
multiprocessingcho CPU-bound (ML training, image processing, data transformation) — bypass GIL, chạy trên nhiều CPU cores thực sự, nhưng overhead cao hơn (IPC, memory).
Pitfall: asyncio không giúp gì cho CPU-bound — dùng ProcessPoolExecutor kết hợp với asyncio.
- Event loop quản lý coroutines.
- Khi coroutine gặp
await, nó tạm dừng và nhường control để event loop chạy coroutine khác. - Đây là cooperative concurrency — không phải parallelism thực sự.
import asyncio, httpx
async def fetch(url: str) -> dict:
async with httpx.AsyncClient() as client:
r = await client.get(url) # Yield control tại đây
return r.json()
async def main():
# Chạy 3 requests đồng thời (concurrent), không phải song song
results = await asyncio.gather(
fetch("/users/1"),
fetch("/users/2"),
fetch("/users/3"),
)
asyncio.run(main())Depends() inject dependencies vào path operations — tự động resolve, supports chaining và cleanup.
from fastapi import Depends
async def get_db():
async with AsyncSessionLocal() as session:
yield session # cleanup sau khi request xong
class Pagination:
def __init__(self, page: int = 1, size: int = 20):
self.offset = (page - 1) * size
self.limit = min(size, 100)
@app.get("/users")
async def list_users(
db = Depends(get_db), # Injected automatically
pg: Pagination = Depends(), # Class-based DI
user = Depends(get_current_user)
):
...Lợi ích: testable (override dependency trong test), reusable, separation of concerns.
FastAPI dùng Pydantic model để tự động validate request body, query params, path params.
Trả về 422 Unprocessable Entity nếu invalid.
from pydantic import BaseModel, EmailStr, Field, field_validator
class UserCreate(BaseModel):
name: str = Field(min_length=2, max_length=50)
email: EmailStr
age: int = Field(ge=0, le=150)
@field_validator('name')
@classmethod
def name_alpha(cls, v: str) -> str:
if not v.replace(' ', '').isalpha():
raise ValueError('Only letters allowed')
return v.strip().title()
model_config = {"from_attributes": True} # Pydantic v2
@app.post("/users", status_code=201)
async def create_user(user: UserCreate): # Auto-validated!
...Middleware chạy trước và sau mỗi request.
Dùng @app.middleware("http") cho custom middleware.
import time, uuid
from fastapi import Request
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
response.headers["X-Request-ID"] = request_id
response.headers["X-Duration"] = f"{duration:.4f}"
return responseCORS, GZip, TrustedHost — dùng built-in middlewares.
Pitfall: Middleware exception handler không bắt HTTP exceptions từ path operations — dùng exception_handler thay.
JWT auth gồm bốn bước:
- Hash password với bcrypt
- Tạo JWT token khi login
- Verify token trong dependency
- Protect routes với
Depends(get_current_user)
Dùng PyJWT (không phải python-jose — đã có CVEs và ít được maintain).
from passlib.context import CryptContext
import jwt # pip install PyJWT
from datetime import datetime, timedelta, timezone
pwd = CryptContext(schemes=["bcrypt"])
SECRET = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_token(user_id: int) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode({"sub": str(user_id), "exp": expire}, SECRET, ALGORITHM)
async def get_current_user(token = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET, algorithms=[ALGORITHM])
uid = payload.get("sub")
if not uid: raise credentials_exception
except jwt.PyJWTError:
raise HTTPException(401, "Invalid token")
return await get_user(int(uid))- Background Tasks: zero config, in-process, chạy sau khi response trả về client.
- Dùng cho: email xác nhận đơn giản, logging, non-critical async work.
- Celery: cần Redis/RabbitMQ broker, distributed workers, retry logic, monitoring với Flower.
- Dùng cho: payment processing, report generation, critical tasks cần retry.
Pitfall: Background Tasks bị mất nếu server restart — Celery persist tasks trong broker.
CORS (Cross-Origin Resource Sharing) là browser security mechanism ngăn requests từ origin khác.
Backend phải whitelist các origins được phép.
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173", # Vite dev server
"https://yourapp.com", # Production
],
allow_credentials=True, # Cần cho cookies/auth headers
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Authorization", "Content-Type"],
)Pitfall: allow_origins=["*"] với allow_credentials=True sẽ bị browser từ chối — phải liệt kê explicit origins.
N+1 problem: 1 query lấy N records, rồi N queries nữa để lấy related data.
Giải quyết bằng eager loading.
# N+1 problem — 1 + N queries
users = await db.execute(select(User))
for user in users.scalars():
posts = await db.execute( # 1 query mỗi user!
select(Post).where(Post.user_id == user.id)
)
# Fix: selectinload — 2 queries total
from sqlalchemy.orm import selectinload
result = await db.execute(
select(User).options(selectinload(User.posts))
)
# 1 query lấy tất cả users + 1 query lấy posts cho tất cả user_ids
# joinedload — 1 query với JOIN (tốt khi result set nhỏ)
from sqlalchemy.orm import joinedloadSQLAlchemy 2.0 async dùng create_async_engine, async_sessionmaker, và ORM style mới với Mapped/mapped_column — fully type-safe.
from sqlalchemy.ext.asyncio import (
create_async_engine, AsyncSession, async_sessionmaker
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10, max_overflow=20,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase): pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
name: Mapped[str] = mapped_column(nullable=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield sessionDùng TestClient cho synchronous-style test của async endpoints.
Override dependencies để isolate tests.
from fastapi.testclient import TestClient
import pytest
def override_get_db():
yield TestingSessionLocal()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def test_create_user():
res = client.post("/users", json={
"name": "Alice", "email": "alice@test.com"
})
assert res.status_code == 201
assert res.json()["email"] == "alice@test.com"
# Async test
@pytest.mark.asyncio
async def test_async():
async with AsyncClient(app=app, base_url="http://test") as ac:
res = await ac.get("/users/1")
assert res.status_code == 200Fixture scope xác định fixture được tạo và destroy khi nào: function (default) — mỗi test; class — mỗi test class; module — mỗi module; session — toàn bộ test session.
@pytest.fixture(scope="session")
def db_engine(): # Tạo 1 lần cho toàn bộ session
engine = create_engine(TEST_URL)
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
@pytest.fixture # scope="function" — mỗi test
def db(db_engine):
connection = db_engine.connect()
transaction = connection.begin()
session = Session(bind=connection)
yield session
session.close()
transaction.rollback() # Rollback sau mỗi test — isolationDùng mocker fixture từ pytest-mock để mock functions và verify calls.
async def test_register_sends_email(mocker):
# Mock email service
mock_send = mocker.patch(
"app.services.email_service.send",
new_callable=AsyncMock
)
mock_send.return_value = True
await register_user(email="test@test.com")
mock_send.assert_called_once_with(
"test@test.com", "Welcome!"
)
def test_with_side_effect(mocker):
mock_api = mocker.patch("app.services.external_api.call")
mock_api.side_effect = [{"status": "ok"}, ConnectionError()]
assert call_api() == {"status": "ok"}
with pytest.raises(ConnectionError):
call_api()pydantic-settings tự động đọc env vars và validate types.
Hỗ trợ .env file.
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8"
)
app_name: str = "My API"
debug: bool = False
database_url: str # Required — không có default
secret_key: str
redis_url: str = "redis://localhost:6379"
access_token_expire_minutes: int = 30
@lru_cache # Singleton — đọc .env một lần
def get_settings() -> Settings:
return Settings()
settings = get_settings()args cho phép nhận bất kỳ số positional arguments — nhóm thành tuple. *kwargs nhận bất kỳ số keyword arguments — nhóm thành dict.
def log(level, *args, **kwargs):
print(f"[{level}]", *args)
for k, v in kwargs.items():
print(f" {k}: {v}")
log("INFO", "Server started", port=8080, debug=True)
# [INFO] Server started
# port: 8080
# debug: True
# Unpacking khi gọi function
def create_user(name, email, age):
...
data = {"name": "Alice", "email": "a@b.com", "age": 25}
create_user(**data) # Unpack dict thành kwargsThứ tự params: def fn(pos, /, normal, args, kw_only, *kwargs).
Pitfall: args và *kwargs không giữ type information — dùng overloads hoặc TypedDict cho strict typing.
Walrus operator (PEP 572, Python 3.8+) cho phép assign và return giá trị trong cùng một expression — giảm lặp code.
# Trước 3.8 — gọi hàm 2 lần hoặc dùng temp variable
data = get_data()
if data:
process(data)
# Với walrus — assign và kiểm tra cùng lúc
if data := get_data():
process(data)
# Đặc biệt hữu ích trong while loop
while chunk := file.read(8192):
process(chunk)
# List comprehension với filter + transform
results = [y for x in data if (y := transform(x)) > 0]
# Tránh gọi transform() 2 lần (1 cho filter, 1 cho value)Pitfall: không lạm dụng — walrus trong nested expression phức tạp làm code khó đọc hơn.
Dùng khi thực sự tránh được duplicate computation.
Python type hints mạnh mẽ hơn chỉ int, str cơ bản:
from typing import Optional, Union, TypeVar, Generic, Protocol
from collections.abc import Sequence
# Optional[X] tương đương Union[X, None]
def find_user(user_id: int) -> Optional[User]:
return db.get(user_id) # Có thể None
# Union — nhiều types
def parse_id(value: Union[str, int]) -> int:
return int(value)
# TypeVar — generic type variable
T = TypeVar('T')
def first(lst: Sequence[T]) -> Optional[T]:
return lst[0] if lst else None
# Generic class
class Stack(Generic[T]):
def __init__(self): self._items: list[T] = []
def push(self, item: T) -> None: self._items.append(item)
def pop(self) -> T: return self._items.pop()
# Protocol — structural subtyping (duck typing + type safety)
class Drawable(Protocol):
def draw(self) -> None: ...
def render(obj: Drawable) -> None:
obj.draw() # Bất kỳ class nào có .draw() đều hợp lệPython 3.10+: dùng X | Y thay vì Union[X, Y], X | None thay vì Optional[X].
Iterator: bất kỳ object nào implement __iter__() và __next__(). Generator: function dùng yield — tự động implement iterator protocol, lazy, stateful.
# Class-based Iterator
class CountDown:
def __init__(self, start): self.current = start
def __iter__(self): return self # Trả về chính nó
def __next__(self):
if self.current <= 0:
raise StopIteration
self.current -= 1
return self.current + 1
list(CountDown(3)) # [3, 2, 1]
# Generator function — ngắn gọn hơn
def countdown(n):
while n > 0:
yield n
n -= 1
# Generator expression — lazy (khác list comprehension)
gen = (x**2 for x in range(1_000_000)) # Không chiếm RAMGenerator với send() — coroutine-lite, nhận value từ caller:
def accumulator():
total = 0
while True:
value = yield total # Nhận value từ .send()
if value is None: break
total += value
acc = accumulator()
next(acc) # Khởi động generator
acc.send(10) # 10
acc.send(20) # 30FastAPI 0.100+ dùng Pydantic v2 mặc định — nhanh hơn 5-50x nhờ Rust core.
Thay đổi chính:
# v1 → v2 migration
# 1. validator → field_validator
# v1:
@validator('email')
def validate_email(cls, v): ...
# v2:
from pydantic import field_validator
@field_validator('email')
@classmethod
def validate_email(cls, v: str) -> str: ...
# 2. orm_mode → model_config
# v1: class Config: orm_mode = True
# v2:
from pydantic import ConfigDict
class UserSchema(BaseModel):
model_config = ConfigDict(from_attributes=True)
# 3. .dict() → .model_dump()
user.model_dump()
user.model_dump(exclude={'password'})
# 4. .json() → .model_dump_json()
user.model_dump_json()
# 5. parse_obj → model_validate
UserSchema.model_validate({"id": 1, "name": "Alice"})
UserSchema.model_validate(orm_object, from_attributes=True)
# 6. BaseSettings bị tách ra pydantic-settings package
from pydantic_settings import BaseSettingsFastAPI chạy trên Starlette (ASGI), hỗ trợ cả sync và async endpoints:
async def: dùng khi endpoint thực hiện I/O async (database queries async, HTTP calls async):
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
user = await db.get(User, id) # Async I/O
return userdef (sync): khi dùng blocking operations không thể async.
FastAPI tự động chạy sync endpoint trong ThreadPoolExecutor để không block event loop:
@app.get("/compute")
def cpu_task(data: list):
return [x**2 for x in data] # CPU-bound — sync OKKhông được làm: gọi blocking I/O trong async def — block toàn bộ event loop:
@app.get("/wrong")
async def bad_endpoint():
time.sleep(1) # BLOCKING! Sai!
requests.get("...") # BLOCKING! Sai!
# Dùng asyncio.sleep() và httpx.AsyncClient() thay- Rule: I/O async →
async def. - CPU-bound hoặc blocking library →
def. - Không bao giờ dùng blocking I/O trong
async def.
@pytest.mark.parametrize cho phép chạy cùng test với nhiều input/output khác nhau:
import pytest
@pytest.mark.parametrize("email,is_valid", [
("user@example.com", True),
("invalid-email", False),
("@nodomain.com", False),
("user@.com", False),
("", False),
])
def test_validate_email(email: str, is_valid: bool):
assert validate_email(email) == is_valid
# Multiple params
@pytest.mark.parametrize("a,b,expected", [
(1, 2, 3),
(0, 0, 0),
(-1, 1, 0),
(100, 200, 300),
])
def test_add(a, b, expected):
assert add(a, b) == expected
# Nested parametrize — cartesian product
@pytest.mark.parametrize("base", [2, 10])
@pytest.mark.parametrize("exponent", [0, 1, 2])
def test_power(base, exponent):
assert power(base, exponent) == base ** exponent
# 6 test cases: (2,0), (2,1), (2,2), (10,0), (10,1), (10,2)
# Với pytest.param cho custom IDs hoặc marks
@pytest.mark.parametrize("value", [
pytest.param(None, id="null-value"),
pytest.param([], marks=pytest.mark.xfail, id="empty-list"),
])
def test_process(value): ...Mock là base class; MagicMock mở rộng với magic method support (__len__, __iter__, __enter__...); AsyncMock (Python 3.8+) bắt buộc cho coroutine — dùng sai type thì assert_awaited_once sẽ silently pass dù không được await.
from unittest.mock import Mock, MagicMock, AsyncMock, patch
# Mock — basic mock object
mock = Mock()
mock.method() # OK, trả về Mock
mock.method.assert_called_once()
# MagicMock — Mock + magic methods (__len__, __iter__, __enter__ etc.)
magic = MagicMock()
len(magic) # OK, gọi __len__
with magic: # OK, gọi __enter__/__exit__
pass
list(magic) # OK, gọi __iter__
# AsyncMock (Python 3.8+) — cho async functions
async_mock = AsyncMock(return_value={"data": []})
# await async_mock() # OK
# Patch context manager
with patch('mymodule.requests.get') as mock_get:
mock_get.return_value.json.return_value = {'id': 1}
result = fetch_user(1)
mock_get.assert_called_once_with('https://api.com/users/1')
# Patch cho async
with patch('mymodule.send_email', new_callable=AsyncMock) as mock_email:
mock_email.return_value = True
await register_user("test@test.com")
mock_email.assert_awaited_once()
# side_effect — simulate exceptions hay dynamic return values
mock.method.side_effect = ValueError("DB connection failed")
mock.method.side_effect = [1, 2, 3] # Trả về lần lượtKết hợp __slots__ với @dataclass để có cả type safety và memory efficiency:
from dataclasses import dataclass
import sys
@dataclass
class UserNormal:
id: int; name: str; email: str
@dataclass(slots=True) # Python 3.10+
class UserSlots:
id: int; name: str; email: str
print(sys.getsizeof(UserNormal(1, 'Alice', 'a@b.com'))) # ~256 bytes (có __dict__)
print(sys.getsizeof(UserSlots(1, 'Alice', 'a@b.com'))) # ~56 bytes (không có __dict__)
# frozen=True + slots=True — immutable và memory-efficient
@dataclass(frozen=True, slots=True)
class Point:
x: float
y: float
p = Point(1.0, 2.0)
# p.x = 3.0 # FrozenInstanceErrorDùng @dataclass(slots=True) khi có hàng triệu instances — tiết kiệm ~30-60% memory.
MRO xác định thứ tự Python tìm method trong cây kế thừa, dùng thuật toán C3 Linearization.
Giải quyết diamond problem bằng cách đảm bảo mỗi class chỉ xuất hiện một lần.
class A:
def method(self): print("A")
class B(A):
def method(self): super().method(); print("B")
class C(A):
def method(self): super().method(); print("C")
class D(B, C): pass
D().method() # A → C → B (theo MRO)
print(D.__mro__) # D, B, C, A, objectXem MRO: ClassName.__mro__ hoặc ClassName.mro().
__slots__ là gì? Khi nào nên dùng?__slots__ giới hạn attributes của instance, loại bỏ __dict__ — tiết kiệm bộ nhớ ~40-60% khi có nhiều instances.
class PointNormal:
def __init__(self, x, y):
self.x, self.y = x, y
# Mỗi instance có __dict__ → ~200 bytes
class PointSlots:
__slots__ = ('x', 'y')
def __init__(self, x, y):
self.x, self.y = x, y
# Không có __dict__ → ~56 bytes
# p = PointSlots(1, 2)
# p.z = 3 # AttributeError — không thể thêm attribute mớiDùng khi: tạo hàng triệu instances nhỏ (data processing, game objects, ML features).
asyncio.gather vs asyncio.TaskGroup — khác nhau gì?asyncio.gather (Python 3.4+): chạy song song, trả về list kết quả. asyncio.TaskGroup (Python 3.11+) — recommended cách mới: nếu một task fail, các tasks còn lại bị cancel tự động.
# gather — return_exceptions để không propagate lỗi
results = await asyncio.gather(
task1(), task2(), task3(),
return_exceptions=True
)
# TaskGroup — structured concurrency (recommended 3.11+)
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task1())
t2 = tg.create_task(task2())
# Tất cả tasks xong khi thoát context
# Nếu 1 task raise exception → cancel tasks còn lại
print(t1.result(), t2.result())Race condition xảy ra khi nhiều coroutines đọc-modify-write shared state, kết quả phụ thuộc vào thứ tự thực thi.
Giải quyết bằng asyncio.Lock.
# Race condition
counter = 0
async def increment():
global counter
current = counter
await asyncio.sleep(0) # Yield — có thể bị preempt!
counter = current + 1 # Ghi lại giá trị cũ
# Fix với Lock
lock = asyncio.Lock()
async def safe_increment():
async with lock:
global counter
counter += 1 # Atomic trong lock contextSemaphore: giới hạn số coroutines concurrent. asyncio.Semaphore(10) tối đa 10 concurrent requests.
Dùng @asynccontextmanager với lifespan parameter thay vì @app.on_event (deprecated).
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await database.connect()
redis_pool = await create_redis_pool()
app.state.redis = redis_pool
yield # App đang chạy
# Shutdown
await database.disconnect()
await redis_pool.aclose()
app = FastAPI(lifespan=lifespan)Cache-aside pattern: application kiểm tra cache trước, nếu miss thì query DB và populate cache.
import json
from functools import wraps
def cache(key_prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = f"{key_prefix}:{args}:{kwargs}"
cached = await redis.get(key)
if cached:
return json.loads(cached)
result = await func(*args, **kwargs)
await redis.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@cache("products", ttl=60)
async def get_products():
return await db.query_products()Invalidation: redis.delete(key) khi data thay đổi.
WebSocket cho phép two-way real-time communication.
from fastapi import WebSocket, WebSocketDisconnect
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept(); self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, msg: str):
for ws in self.active:
await ws.send_text(msg)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def ws_endpoint(ws: WebSocket, client_id: str):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
await manager.broadcast(f"{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(ws)Metaclass là "class của class" — kiểm soát cách class được tạo ra. type là metaclass mặc định của tất cả classes.
# type() tạo class dynamically
MyClass = type('MyClass', (BaseClass,), {'attr': 42, 'method': lambda self: ...})
# Custom metaclass
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=SingletonMeta):
def __init__(self):
self.connection = None
db1 = Database()
db2 = Database()
assert db1 is db2 # True — cùng instance
# __init_subclass__ — thay thế nhẹ nhàng hơn metaclass
class PluginBase:
_plugins = {}
def __init_subclass__(cls, plugin_name: str, **kwargs):
super().__init_subclass__(**kwargs)
PluginBase._plugins[plugin_name] = cls
class MyPlugin(PluginBase, plugin_name="my_plugin"):
passDùng metaclass cho: Singleton, ORM models (Django Model, SQLAlchemy), plugin systems, API enforcement.
__get__, __set__, __delete__?Descriptor là object định nghĩa cách attribute được truy cập — cơ chế ẩn sau @property, @classmethod, @staticmethod.
class Validator:
"""Non-data descriptor (chỉ __get__)"""
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
if obj is None: return self # Access từ class
return obj.__dict__.get(self.name)
def __set__(self, obj, value):
# Data descriptor — có cả __set__
if not isinstance(value, int) or value < 0:
raise ValueError(f"{self.name} phải là số nguyên không âm")
obj.__dict__[self.name] = value
class Product:
price = Validator() # Descriptor instance làm class attribute
quantity = Validator()
p = Product()
p.price = 100 # Gọi Validator.__set__
p.price = -1 # ValueError!Data descriptor (có __set__): ưu tiên hơn instance __dict__. Non-data descriptor (chỉ __get__): instance __dict__ ưu tiên hơn.
Hiểu Descriptor giải thích tại sao instance.method trả về bound method.
Async equivalents của context managers và generators cho async code:
from contextlib import asynccontextmanager
# Async context manager với @asynccontextmanager
@asynccontextmanager
async def managed_connection(pool):
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
async with managed_connection(pool) as conn:
await conn.execute(query)
# Class-based async context manager
class AsyncDBSession:
async def __aenter__(self):
self.session = await create_session()
return self.session
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
await self.session.rollback()
else:
await self.session.commit()
await self.session.close()
# Async generator
async def fetch_pages(url: str):
page = 1
while True:
data = await httpx.get(f"{url}?page={page}")
if not data: break
yield data
page += 1
async for page_data in fetch_pages("https://api.example.com/users"):
process(page_data)ThreadPoolExecutor và ProcessPoolExecutor với asyncio?Khi cần chạy blocking code hoặc CPU-bound tasks trong async context:
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ThreadPoolExecutor — cho blocking I/O trong sync libraries
executor = ThreadPoolExecutor(max_workers=10)
async def call_blocking_lib():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
blocking_function, # Không thể await được
arg1, arg2
)
return result
# ProcessPoolExecutor — cho CPU-bound tasks
cpu_executor = ProcessPoolExecutor(max_workers=4)
def heavy_computation(data): # Pure CPU work
return [x**2 for x in data]
async def process_data(large_list):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
cpu_executor,
heavy_computation,
large_list
)
return result
# asyncio.to_thread (Python 3.9+) — shortcut cho ThreadPool
async def modern_approach():
result = await asyncio.to_thread(blocking_function, arg)Pitfall: Objects truyền qua ProcessPoolExecutor phải pickle-able.
ThreadPoolExecutor shares memory nhưng bị GIL với CPU-bound tasks.