python/aiovieter/vieter.py

62 lines
1.8 KiB
Python

import aiohttp
from .targets import VieterTargets
from .logs import VieterBuildLogs
API_PREFIX = "/api/v1"
class VieterApiException(Exception):
pass
class Vieter:
def __init__(self, address: str, api_key: str) -> None:
self._address = address
self._api_key = api_key
self._client = aiohttp.ClientSession(
base_url=address, headers={"X-Api-Key": api_key}
)
self.targets = VieterTargets(self)
self.logs = VieterBuildLogs(self)
async def close(self):
await self._client.close()
async def _do_req(self, method: str, path: str, params: dict = None):
if params is not None:
params = {k: v for k, v in params.items() if v is not None}
await self._client.request(method, f"{API_PREFIX}{path}", params=params)
async def _do_req_json(self, method: str, path: str, params: dict = None):
if params is not None:
params = {k: v for k, v in params.items() if v is not None}
async with self._client.request(
method, f"{API_PREFIX}{path}", params=params
) as res:
if res.status == 404:
return VieterApiException("Error 404")
data = await res.json()
if res.status != 200:
raise VieterApiException(data["message"])
return data["data"]
async def _get(self, *args, **kwargs):
return await self._do_req_json("GET", *args, **kwargs)
async def _post(self, *args, **kwargs):
return await self._do_req("POST", *args, **kwargs)
async def _patch(self, *args, **kwargs):
return await self._do_req("PATCH", *args, **kwargs)
async def _delete(self, *args, **kwargs):
return await self._do_req("DELETE", *args, **kwargs)