Snippets Python
Intégration de l’API THE HIVE depuis Python 3.11+ — scripts d’automatisation, notebooks, services FastAPI.Flow général
1. Client minimal avec requests
import requests
BASE = "https://api.wethehivers.com/v1/api"
def login(email: str, password: str) -> dict:
r = requests.post(f"{BASE}/auth/login",
json={"email": email, "password": password},
timeout=10)
r.raise_for_status()
return r.json()
tokens = login("test@example.com", "Passw0rd!")
access, refresh = tokens["accessToken"], tokens["refreshToken"]
2. Recherche d’offres
def search_offres(q: str, type_contrat: str = None, token: str = None) -> dict:
params = {"q": q, "page": 0, "size": 20}
if type_contrat:
params["typeContrat"] = type_contrat
headers = {"Authorization": f"Bearer {token}"} if token else {}
r = requests.get(f"{BASE}/offres/search", params=params,
headers=headers, timeout=10)
r.raise_for_status()
return r.json()
data = search_offres("java", "CDI")
print(f"{data['totalElements']} offres trouvées")
3. Upload CV (multipart)
from pathlib import Path
def upload_cv(pdf_path: Path, token: str) -> int:
with pdf_path.open("rb") as f:
r = requests.post(
f"{BASE}/candidats/me/cv",
files={"file": (pdf_path.name, f, "application/pdf")},
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
r.raise_for_status()
return r.json()["cvId"]
4. TokenManager avec auto-refresh
import base64, json, time, threading
class TokenManager:
def __init__(self, refresh_token: str):
self._refresh = refresh_token
self._access: str | None = None
self._lock = threading.Lock()
def valid_access(self) -> str:
if self._access and not self._is_expired(self._access):
return self._access
with self._lock:
if self._access and not self._is_expired(self._access):
return self._access
return self._do_refresh()
def _do_refresh(self) -> str:
r = requests.post(f"{BASE}/auth/refresh-token",
json={"refreshToken": self._refresh}, timeout=10)
r.raise_for_status()
data = r.json()
self._access = data["accessToken"]
self._refresh = data["refreshToken"]
return self._access
def _is_expired(self, jwt: str) -> bool:
payload_b64 = jwt.split(".")[1]
payload_b64 += "=" * (-len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_b64))
return time.time() >= payload["exp"] - 30
5. Session avec authentification auto
class HiveSession(requests.Session):
def __init__(self, tm: TokenManager):
super().__init__()
self.tm = tm
def request(self, method, url, **kwargs):
headers = kwargs.setdefault("headers", {})
headers["Authorization"] = f"Bearer {self.tm.valid_access()}"
if not url.startswith("http"):
url = f"{BASE}{url}"
resp = super().request(method, url, timeout=10, **kwargs)
if resp.status_code == 401:
self.tm._do_refresh()
headers["Authorization"] = f"Bearer {self.tm.valid_access()}"
resp = super().request(method, url, timeout=10, **kwargs)
return resp
session = HiveSession(tm)
offres = session.get("/offres/search?q=python").json()
6. Client async avec httpx
import httpx, asyncio
class AsyncHiveClient:
def __init__(self, tm: TokenManager):
self.tm = tm
self.client = httpx.AsyncClient(base_url=BASE, timeout=10.0)
async def _auth(self) -> dict:
return {"Authorization": f"Bearer {self.tm.valid_access()}"}
async def search(self, q: str) -> dict:
r = await self.client.get("/offres/search",
params={"q": q}, headers=await self._auth())
r.raise_for_status()
return r.json()
async def close(self):
await self.client.aclose()
async def main():
client = AsyncHiveClient(tm)
results = await asyncio.gather(
client.search("java"),
client.search("python"),
client.search("devops"),
)
await client.close()
return results
7. Retry avec tenacity (rate limit)
from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt
class RateLimited(Exception): ...
@retry(
retry=retry_if_exception_type(RateLimited),
wait=wait_exponential(multiplier=2, max=30),
stop=stop_after_attempt(4),
)
def fetch_offre(offre_id: int, token: str) -> dict:
r = requests.get(f"{BASE}/offres/{offre_id}",
headers={"Authorization": f"Bearer {token}"}, timeout=10)
if r.status_code == 429:
raise RateLimited(r.headers.get("Retry-After", "2"))
r.raise_for_status()
return r.json()
8. Pagination automatique
def paginate(session: HiveSession, path: str, **params):
page = 0
while True:
resp = session.get(path, params={**params, "page": page, "size": 50}).json()
yield from resp["content"]
if page + 1 >= resp["totalPages"]:
return
page += 1
for offre in paginate(session, "/offres/search", q="java"):
print(offre["titre"])
9. Upload CV avec progression (tqdm)
from tqdm import tqdm
import requests_toolbelt as rtb
def upload_with_progress(pdf_path: Path, token: str):
encoder = rtb.MultipartEncoder(
fields={"file": (pdf_path.name, open(pdf_path, "rb"), "application/pdf")}
)
bar = tqdm(total=encoder.len, unit="B", unit_scale=True)
monitor = rtb.MultipartEncoderMonitor(
encoder, lambda m: bar.update(m.bytes_read - bar.n)
)
r = requests.post(
f"{BASE}/candidats/me/cv",
data=monitor,
headers={"Content-Type": monitor.content_type,
"Authorization": f"Bearer {token}"},
)
bar.close()
r.raise_for_status()
return r.json()
10. Client typé depuis OpenAPI
Viaopenapi-python-client :
pip install openapi-python-client
openapi-python-client generate --url https://api.wethehivers.com/v3/api-docs
from hive_api_client import Client
from hive_api_client.api.offres import search_offres
from hive_api_client.models import OffreSearchParams
client = Client(base_url=BASE).with_headers({"Authorization": f"Bearer {token}"})
resp = search_offres.sync(client=client, q="java", type_contrat="CDI")
11. Intégration FastAPI (proxy interne)
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
tm = TokenManager(refresh_token=os.environ["HIVE_REFRESH"])
hive = HiveSession(tm)
@app.get("/internal/offres")
async def proxy_offres(q: str):
r = hive.get("/offres/search", params={"q": q})
if not r.ok:
raise HTTPException(r.status_code, r.text)
return r.json()
12. Scripts d’administration
# Export de toutes les candidatures d'une offre en CSV
import csv
def export_candidatures(offre_id: int, session: HiveSession, out: Path):
with out.open("w", newline="") as f:
w = csv.writer(f)
w.writerow(["id", "candidat", "statut", "créée le"])
for c in paginate(session, f"/recruiters/me/offres/{offre_id}/candidatures"):
w.writerow([c["id"], c["candidat"]["nomComplet"],
c["statut"], c["createdAt"]])
export_candidatures(42, session, Path("candidatures_42.csv"))
13. Tests avec respx (mock httpx)
import respx, httpx, pytest
@respx.mock
def test_search():
respx.get(f"{BASE}/offres/search").mock(
return_value=httpx.Response(200, json={
"content": [{"id": 1, "titre": "Dev Python", "slug": "dev-python"}],
"totalElements": 1, "totalPages": 1,
})
)
r = requests.get(f"{BASE}/offres/search", params={"q": "python"})
assert r.json()["totalElements"] == 1