Skip to main content

Snippets Python

Intégration de l’API THE HIVE depuis Python 3.11+ — scripts d’automatisation, notebooks, services FastAPI.

Flow général

1. Client minimal avec requests

import requests

BASE = "https://api.wethehivers.com/v1/api"

def login(email: str, password: str) -> dict:
    r = requests.post(f"{BASE}/auth/login",
                      json={"email": email, "password": password},
                      timeout=10)
    r.raise_for_status()
    return r.json()

tokens = login("test@example.com", "Passw0rd!")
access, refresh = tokens["accessToken"], tokens["refreshToken"]

2. Recherche d’offres

def search_offres(q: str, type_contrat: str = None, token: str = None) -> dict:
    params = {"q": q, "page": 0, "size": 20}
    if type_contrat:
        params["typeContrat"] = type_contrat
    headers = {"Authorization": f"Bearer {token}"} if token else {}
    r = requests.get(f"{BASE}/offres/search", params=params,
                     headers=headers, timeout=10)
    r.raise_for_status()
    return r.json()

data = search_offres("java", "CDI")
print(f"{data['totalElements']} offres trouvées")

3. Upload CV (multipart)

from pathlib import Path

def upload_cv(pdf_path: Path, token: str) -> int:
    with pdf_path.open("rb") as f:
        r = requests.post(
            f"{BASE}/candidats/me/cv",
            files={"file": (pdf_path.name, f, "application/pdf")},
            headers={"Authorization": f"Bearer {token}"},
            timeout=30,
        )
    r.raise_for_status()
    return r.json()["cvId"]

4. TokenManager avec auto-refresh

import base64, json, time, threading

class TokenManager:
    def __init__(self, refresh_token: str):
        self._refresh = refresh_token
        self._access: str | None = None
        self._lock = threading.Lock()

    def valid_access(self) -> str:
        if self._access and not self._is_expired(self._access):
            return self._access
        with self._lock:
            if self._access and not self._is_expired(self._access):
                return self._access
            return self._do_refresh()

    def _do_refresh(self) -> str:
        r = requests.post(f"{BASE}/auth/refresh-token",
                          json={"refreshToken": self._refresh}, timeout=10)
        r.raise_for_status()
        data = r.json()
        self._access = data["accessToken"]
        self._refresh = data["refreshToken"]
        return self._access

    def _is_expired(self, jwt: str) -> bool:
        payload_b64 = jwt.split(".")[1]
        payload_b64 += "=" * (-len(payload_b64) % 4)
        payload = json.loads(base64.urlsafe_b64decode(payload_b64))
        return time.time() >= payload["exp"] - 30

5. Session avec authentification auto

class HiveSession(requests.Session):
    def __init__(self, tm: TokenManager):
        super().__init__()
        self.tm = tm

    def request(self, method, url, **kwargs):
        headers = kwargs.setdefault("headers", {})
        headers["Authorization"] = f"Bearer {self.tm.valid_access()}"
        if not url.startswith("http"):
            url = f"{BASE}{url}"
        resp = super().request(method, url, timeout=10, **kwargs)
        if resp.status_code == 401:
            self.tm._do_refresh()
            headers["Authorization"] = f"Bearer {self.tm.valid_access()}"
            resp = super().request(method, url, timeout=10, **kwargs)
        return resp

session = HiveSession(tm)
offres = session.get("/offres/search?q=python").json()

6. Client async avec httpx

import httpx, asyncio

class AsyncHiveClient:
    def __init__(self, tm: TokenManager):
        self.tm = tm
        self.client = httpx.AsyncClient(base_url=BASE, timeout=10.0)

    async def _auth(self) -> dict:
        return {"Authorization": f"Bearer {self.tm.valid_access()}"}

    async def search(self, q: str) -> dict:
        r = await self.client.get("/offres/search",
                                  params={"q": q}, headers=await self._auth())
        r.raise_for_status()
        return r.json()

    async def close(self):
        await self.client.aclose()

async def main():
    client = AsyncHiveClient(tm)
    results = await asyncio.gather(
        client.search("java"),
        client.search("python"),
        client.search("devops"),
    )
    await client.close()
    return results

7. Retry avec tenacity (rate limit)

from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt

class RateLimited(Exception): ...

@retry(
    retry=retry_if_exception_type(RateLimited),
    wait=wait_exponential(multiplier=2, max=30),
    stop=stop_after_attempt(4),
)
def fetch_offre(offre_id: int, token: str) -> dict:
    r = requests.get(f"{BASE}/offres/{offre_id}",
                     headers={"Authorization": f"Bearer {token}"}, timeout=10)
    if r.status_code == 429:
        raise RateLimited(r.headers.get("Retry-After", "2"))
    r.raise_for_status()
    return r.json()

8. Pagination automatique

def paginate(session: HiveSession, path: str, **params):
    page = 0
    while True:
        resp = session.get(path, params={**params, "page": page, "size": 50}).json()
        yield from resp["content"]
        if page + 1 >= resp["totalPages"]:
            return
        page += 1

for offre in paginate(session, "/offres/search", q="java"):
    print(offre["titre"])

9. Upload CV avec progression (tqdm)

from tqdm import tqdm
import requests_toolbelt as rtb

def upload_with_progress(pdf_path: Path, token: str):
    encoder = rtb.MultipartEncoder(
        fields={"file": (pdf_path.name, open(pdf_path, "rb"), "application/pdf")}
    )
    bar = tqdm(total=encoder.len, unit="B", unit_scale=True)
    monitor = rtb.MultipartEncoderMonitor(
        encoder, lambda m: bar.update(m.bytes_read - bar.n)
    )
    r = requests.post(
        f"{BASE}/candidats/me/cv",
        data=monitor,
        headers={"Content-Type": monitor.content_type,
                 "Authorization": f"Bearer {token}"},
    )
    bar.close()
    r.raise_for_status()
    return r.json()

10. Client typé depuis OpenAPI

Via openapi-python-client :
pip install openapi-python-client
openapi-python-client generate --url https://api.wethehivers.com/v3/api-docs
from hive_api_client import Client
from hive_api_client.api.offres import search_offres
from hive_api_client.models import OffreSearchParams

client = Client(base_url=BASE).with_headers({"Authorization": f"Bearer {token}"})
resp = search_offres.sync(client=client, q="java", type_contrat="CDI")

11. Intégration FastAPI (proxy interne)

from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()
tm = TokenManager(refresh_token=os.environ["HIVE_REFRESH"])
hive = HiveSession(tm)

@app.get("/internal/offres")
async def proxy_offres(q: str):
    r = hive.get("/offres/search", params={"q": q})
    if not r.ok:
        raise HTTPException(r.status_code, r.text)
    return r.json()

12. Scripts d’administration

# Export de toutes les candidatures d'une offre en CSV
import csv

def export_candidatures(offre_id: int, session: HiveSession, out: Path):
    with out.open("w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["id", "candidat", "statut", "créée le"])
        for c in paginate(session, f"/recruiters/me/offres/{offre_id}/candidatures"):
            w.writerow([c["id"], c["candidat"]["nomComplet"],
                        c["statut"], c["createdAt"]])

export_candidatures(42, session, Path("candidatures_42.csv"))

13. Tests avec respx (mock httpx)

import respx, httpx, pytest

@respx.mock
def test_search():
    respx.get(f"{BASE}/offres/search").mock(
        return_value=httpx.Response(200, json={
            "content": [{"id": 1, "titre": "Dev Python", "slug": "dev-python"}],
            "totalElements": 1, "totalPages": 1,
        })
    )
    r = requests.get(f"{BASE}/offres/search", params={"q": "python"})
    assert r.json()["totalElements"] == 1

Voir aussi