mirror of
https://github.com/immich-app/immich.git
synced 2025-01-21 00:52:43 -05:00
141 lines
3.9 KiB
Python
141 lines
3.9 KiB
Python
import asyncio
|
|
import os
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from io import BytesIO
|
|
from typing import Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import uvicorn
|
|
from fastapi import Body, Depends, FastAPI
|
|
from PIL import Image
|
|
|
|
from app.models.base import InferenceModel
|
|
|
|
from .config import settings
|
|
from .models.cache import ModelCache
|
|
from .schemas import (
|
|
EmbeddingResponse,
|
|
FaceResponse,
|
|
MessageResponse,
|
|
ModelType,
|
|
TagResponse,
|
|
TextModelRequest,
|
|
TextResponse,
|
|
)
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
def init_state() -> None:
|
|
app.state.model_cache = ModelCache(ttl=settings.model_ttl, revalidate=settings.model_ttl > 0)
|
|
# asyncio is a huge bottleneck for performance, so we use a thread pool to run blocking code
|
|
app.state.thread_pool = ThreadPoolExecutor(settings.request_threads)
|
|
|
|
|
|
async def load_models() -> None:
|
|
models: list[tuple[str, ModelType, dict[str, Any]]] = [
|
|
(settings.classification_model, ModelType.IMAGE_CLASSIFICATION, {}),
|
|
(settings.clip_image_model, ModelType.CLIP, {"mode": "vision"}),
|
|
(settings.clip_text_model, ModelType.CLIP, {"mode": "text"}),
|
|
(settings.facial_recognition_model, ModelType.FACIAL_RECOGNITION, {}),
|
|
]
|
|
|
|
# Get all models
|
|
for model_name, model_type, model_kwargs in models:
|
|
await app.state.model_cache.get(model_name, model_type, eager=settings.eager_startup, **model_kwargs)
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event() -> None:
|
|
init_state()
|
|
await load_models()
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event() -> None:
|
|
app.state.thread_pool.shutdown()
|
|
|
|
|
|
def dep_pil_image(byte_image: bytes = Body(...)) -> Image.Image:
|
|
return Image.open(BytesIO(byte_image))
|
|
|
|
|
|
def dep_cv_image(byte_image: bytes = Body(...)) -> np.ndarray[int, np.dtype[Any]]:
|
|
byte_image_np = np.frombuffer(byte_image, np.uint8)
|
|
return cv2.imdecode(byte_image_np, cv2.IMREAD_COLOR)
|
|
|
|
|
|
@app.get("/", response_model=MessageResponse)
|
|
async def root() -> dict[str, str]:
|
|
return {"message": "Immich ML"}
|
|
|
|
|
|
@app.get("/ping", response_model=TextResponse)
|
|
def ping() -> str:
|
|
return "pong"
|
|
|
|
|
|
@app.post(
|
|
"/image-classifier/tag-image",
|
|
response_model=TagResponse,
|
|
status_code=200,
|
|
)
|
|
async def image_classification(
|
|
image: Image.Image = Depends(dep_pil_image),
|
|
) -> list[str]:
|
|
model = await app.state.model_cache.get(settings.classification_model, ModelType.IMAGE_CLASSIFICATION)
|
|
labels = await predict(model, image)
|
|
return labels
|
|
|
|
|
|
@app.post(
|
|
"/sentence-transformer/encode-image",
|
|
response_model=EmbeddingResponse,
|
|
status_code=200,
|
|
)
|
|
async def clip_encode_image(
|
|
image: Image.Image = Depends(dep_pil_image),
|
|
) -> list[float]:
|
|
model = await app.state.model_cache.get(settings.clip_image_model, ModelType.CLIP, mode="vision")
|
|
embedding = await predict(model, image)
|
|
return embedding
|
|
|
|
|
|
@app.post(
|
|
"/sentence-transformer/encode-text",
|
|
response_model=EmbeddingResponse,
|
|
status_code=200,
|
|
)
|
|
async def clip_encode_text(payload: TextModelRequest) -> list[float]:
|
|
model = await app.state.model_cache.get(settings.clip_text_model, ModelType.CLIP, mode="text")
|
|
embedding = await predict(model, payload.text)
|
|
return embedding
|
|
|
|
|
|
@app.post(
|
|
"/facial-recognition/detect-faces",
|
|
response_model=FaceResponse,
|
|
status_code=200,
|
|
)
|
|
async def facial_recognition(
|
|
image: cv2.Mat = Depends(dep_cv_image),
|
|
) -> list[dict[str, Any]]:
|
|
model = await app.state.model_cache.get(settings.facial_recognition_model, ModelType.FACIAL_RECOGNITION)
|
|
faces = await predict(model, image)
|
|
return faces
|
|
|
|
|
|
async def predict(model: InferenceModel, inputs: Any) -> Any:
|
|
return await asyncio.get_running_loop().run_in_executor(app.state.thread_pool, model.predict, inputs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
is_dev = os.getenv("NODE_ENV") == "development"
|
|
uvicorn.run(
|
|
"app.main:app",
|
|
host=settings.host,
|
|
port=settings.port,
|
|
reload=is_dev,
|
|
workers=settings.workers,
|
|
)
|