google_nest_sdm.auth

Authentication library, implemented by users of the API.

This library is a simple aiohttp that handles authentication when talking to the API. Users are expected to provide their own implementation that provides credentials obtained using the standard Google authentication approaches described at https://developers.google.com/nest/device-access/api/authorization

An implementation of AbstractAuth implements async_get_access_token to provide authentication credentials to the SDM library. The implementation is responsible for managing the lifecycle of the token (any persistence needed, or refresh to deal with expiration, etc).

  1"""Authentication library, implemented by users of the API.
  2
  3This library is a simple `aiohttp` that handles authentication when talking
  4to the API. Users are expected to provide their own implementation that provides
  5credentials obtained using the standard Google authentication approaches
  6described at https://developers.google.com/nest/device-access/api/authorization
  7
  8An implementation of `AbstractAuth` implements `async_get_access_token`
  9to provide authentication credentials to the SDM library. The implementation is
 10responsible for managing the lifecycle of the token (any persistence needed,
 11or refresh to deal with expiration, etc).
 12"""
 13
 14from __future__ import annotations
 15
 16import logging
 17from abc import ABC, abstractmethod
 18from dataclasses import dataclass, field
 19from asyncio import TimeoutError
 20from typing import Any
 21from http import HTTPStatus
 22
 23import aiohttp
 24from aiohttp.client_exceptions import ClientError
 25from google.auth.credentials import Credentials
 26from google.oauth2.credentials import Credentials as OAuthCredentials
 27from mashumaro.mixins.json import DataClassJSONMixin
 28
 29from .exceptions import (
 30    ApiException,
 31    ApiTimeoutException,
 32    AuthException,
 33    ApiForbiddenException,
 34    NotFoundException,
 35)
 36
 37_LOGGER = logging.getLogger(__name__)
 38
 39__all__ = ["AbstractAuth"]
 40
 41HTTP_UNAUTHORIZED = 401
 42AUTHORIZATION_HEADER = "Authorization"
 43ERROR = "error"
 44STATUS = "status"
 45MESSAGE = "message"
 46
 47
 48@dataclass
 49class Status(DataClassJSONMixin):
 50    """Status of the media item."""
 51
 52    code: int = field(default=HTTPStatus.OK)
 53    """The status code, which should be an enum value of google.rpc.Code"""
 54
 55    message: str | None = None
 56    """A developer-facing error message, which should be in English"""
 57
 58    details: list[dict[str, Any]] = field(default_factory=list)
 59    """A list of messages that carry the error details"""
 60
 61
 62@dataclass
 63class Error:
 64    """Error details from the API response."""
 65
 66    status: str | None = None
 67    code: int | None = None
 68    message: str | None = None
 69    details: list[dict[str, Any]] | None = field(default_factory=list)
 70
 71    def __str__(self) -> str:
 72        """Return a string representation of the error details."""
 73        error_message = ""
 74        if self.status:
 75            error_message += self.status
 76        if self.code:
 77            if error_message:
 78                error_message += f" ({self.code})"
 79            else:
 80                error_message += str(self.code)
 81        if self.message:
 82            if error_message:
 83                error_message += ": "
 84            error_message += self.message
 85        if self.details:
 86            error_message += f"\nError details: ({self.details})"
 87        return error_message
 88
 89
 90@dataclass
 91class ErrorResponse(DataClassJSONMixin):
 92    """A response message that contains an error message."""
 93
 94    error: Error | None = None
 95
 96
 97class AbstractAuth(ABC):
 98    """Abstract class to make authenticated requests."""
 99
100    def __init__(self, websession: aiohttp.ClientSession, host: str):
101        """Initialize the AbstractAuth."""
102        self._websession = websession
103        self._host = host
104
105    @abstractmethod
106    async def async_get_access_token(self) -> str:
107        """Return a valid access token."""
108
109    async def async_get_creds(self) -> Credentials:
110        """Return creds for subscriber API."""
111        token = await self.async_get_access_token()
112        return OAuthCredentials(token=token)  # type: ignore[no-untyped-call]
113
114    async def request(
115        self,
116        method: str,
117        url: str,
118        **kwargs: Any,
119    ) -> aiohttp.ClientResponse:
120        """Make a request."""
121        headers = kwargs.get("headers")
122
123        if headers is None:
124            headers = {}
125        else:
126            headers = dict(headers)
127            del kwargs["headers"]
128        if AUTHORIZATION_HEADER not in headers:
129            try:
130                access_token = await self.async_get_access_token()
131            except TimeoutError as err:
132                raise ApiTimeoutException(
133                    f"Timeout requesting API token: {err}"
134                ) from err
135            except ClientError as err:
136                raise AuthException(f"Access token failure: {err}") from err
137            headers[AUTHORIZATION_HEADER] = f"Bearer {access_token}"
138        if not (url.startswith("http://") or url.startswith("https://")):
139            url = f"{self._host}/{url}"
140        _LOGGER.debug("request[%s]=%s", method, url)
141        if method == "post" and "json" in kwargs:
142            _LOGGER.debug("request[post json]=%s", kwargs["json"])
143        try:
144            return await self._request(method, url, headers=headers, **kwargs)
145        except TimeoutError as err:
146            raise ApiTimeoutException(f"Timeout connecting to API: {err}") from err
147        except ClientError as err:
148            raise ApiException(f"Error connecting to API: {err}") from err
149
150    async def _request(
151        self, method: str, url: str, headers: dict[str, str], **kwargs: Any
152    ) -> aiohttp.ClientResponse:
153        return await self._websession.request(method, url, **kwargs, headers=headers)
154
155    async def get(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
156        """Make a get request."""
157        response = await self.request("get", url, **kwargs)
158        return await AbstractAuth._raise_for_status(response)
159
160    async def get_json(self, url: str, **kwargs: Any) -> dict[str, Any]:
161        """Make a get request and return json response."""
162        resp = await self.get(url, **kwargs)
163        try:
164            result = await resp.json()
165        except ClientError as err:
166            raise ApiException("Server returned malformed response") from err
167        if not isinstance(result, dict):
168            raise ApiException("Server return malformed response: %s" % result)
169        _LOGGER.debug("response=%s", result)
170        return result
171
172    async def post(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
173        """Make a post request."""
174        response = await self.request("post", url, **kwargs)
175        return await AbstractAuth._raise_for_status(response)
176
177    async def post_json(self, url: str, **kwargs: Any) -> dict[str, Any]:
178        """Make a post request and return a json response."""
179        resp = await self.post(url, **kwargs)
180        try:
181            result = await resp.json()
182        except ClientError as err:
183            raise ApiException("Server returned malformed response") from err
184        if not isinstance(result, dict):
185            raise ApiException("Server returned malformed response: %s" % result)
186        _LOGGER.debug("response=%s", result)
187        return result
188
189    async def put(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
190        """Make a put request."""
191        response = await self.request("put", url, **kwargs)
192        return await AbstractAuth._raise_for_status(response)
193
194    async def delete(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
195        """Make a delete request."""
196        response = await self.request("delete", url, **kwargs)
197        return await AbstractAuth._raise_for_status(response)
198
199    @classmethod
200    async def _raise_for_status(
201        cls, resp: aiohttp.ClientResponse
202    ) -> aiohttp.ClientResponse:
203        """Raise exceptions on failure methods."""
204        error_detail = await cls._error_detail(resp)
205        try:
206            resp.raise_for_status()
207        except aiohttp.ClientResponseError as err:
208            error_message = f"{err.message} response from API ({resp.status})"
209            if error_detail:
210                error_message += f": {error_detail}"
211            if err.status == HTTPStatus.FORBIDDEN:
212                raise ApiForbiddenException(error_message)
213            if err.status == HTTPStatus.UNAUTHORIZED:
214                raise AuthException(error_message)
215            if err.status == HTTPStatus.NOT_FOUND:
216                raise NotFoundException(error_message)
217            raise ApiException(error_message) from err
218        except aiohttp.ClientError as err:
219            raise ApiException(f"Error from API: {err}") from err
220        return resp
221
222    @classmethod
223    async def _error_detail(cls, resp: aiohttp.ClientResponse) -> Error | None:
224        """Returns an error message string from the APi response."""
225        if resp.status < 400:
226            return None
227        try:
228            result = await resp.text()
229        except ClientError:
230            return None
231        try:
232            error_response = ErrorResponse.from_json(result)
233        except (LookupError, ValueError):
234            return None
235        return error_response.error
class AbstractAuth(abc.ABC):
 98class AbstractAuth(ABC):
 99    """Abstract class to make authenticated requests."""
100
101    def __init__(self, websession: aiohttp.ClientSession, host: str):
102        """Initialize the AbstractAuth."""
103        self._websession = websession
104        self._host = host
105
106    @abstractmethod
107    async def async_get_access_token(self) -> str:
108        """Return a valid access token."""
109
110    async def async_get_creds(self) -> Credentials:
111        """Return creds for subscriber API."""
112        token = await self.async_get_access_token()
113        return OAuthCredentials(token=token)  # type: ignore[no-untyped-call]
114
115    async def request(
116        self,
117        method: str,
118        url: str,
119        **kwargs: Any,
120    ) -> aiohttp.ClientResponse:
121        """Make a request."""
122        headers = kwargs.get("headers")
123
124        if headers is None:
125            headers = {}
126        else:
127            headers = dict(headers)
128            del kwargs["headers"]
129        if AUTHORIZATION_HEADER not in headers:
130            try:
131                access_token = await self.async_get_access_token()
132            except TimeoutError as err:
133                raise ApiTimeoutException(
134                    f"Timeout requesting API token: {err}"
135                ) from err
136            except ClientError as err:
137                raise AuthException(f"Access token failure: {err}") from err
138            headers[AUTHORIZATION_HEADER] = f"Bearer {access_token}"
139        if not (url.startswith("http://") or url.startswith("https://")):
140            url = f"{self._host}/{url}"
141        _LOGGER.debug("request[%s]=%s", method, url)
142        if method == "post" and "json" in kwargs:
143            _LOGGER.debug("request[post json]=%s", kwargs["json"])
144        try:
145            return await self._request(method, url, headers=headers, **kwargs)
146        except TimeoutError as err:
147            raise ApiTimeoutException(f"Timeout connecting to API: {err}") from err
148        except ClientError as err:
149            raise ApiException(f"Error connecting to API: {err}") from err
150
151    async def _request(
152        self, method: str, url: str, headers: dict[str, str], **kwargs: Any
153    ) -> aiohttp.ClientResponse:
154        return await self._websession.request(method, url, **kwargs, headers=headers)
155
156    async def get(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
157        """Make a get request."""
158        response = await self.request("get", url, **kwargs)
159        return await AbstractAuth._raise_for_status(response)
160
161    async def get_json(self, url: str, **kwargs: Any) -> dict[str, Any]:
162        """Make a get request and return json response."""
163        resp = await self.get(url, **kwargs)
164        try:
165            result = await resp.json()
166        except ClientError as err:
167            raise ApiException("Server returned malformed response") from err
168        if not isinstance(result, dict):
169            raise ApiException("Server return malformed response: %s" % result)
170        _LOGGER.debug("response=%s", result)
171        return result
172
173    async def post(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
174        """Make a post request."""
175        response = await self.request("post", url, **kwargs)
176        return await AbstractAuth._raise_for_status(response)
177
178    async def post_json(self, url: str, **kwargs: Any) -> dict[str, Any]:
179        """Make a post request and return a json response."""
180        resp = await self.post(url, **kwargs)
181        try:
182            result = await resp.json()
183        except ClientError as err:
184            raise ApiException("Server returned malformed response") from err
185        if not isinstance(result, dict):
186            raise ApiException("Server returned malformed response: %s" % result)
187        _LOGGER.debug("response=%s", result)
188        return result
189
190    async def put(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
191        """Make a put request."""
192        response = await self.request("put", url, **kwargs)
193        return await AbstractAuth._raise_for_status(response)
194
195    async def delete(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
196        """Make a delete request."""
197        response = await self.request("delete", url, **kwargs)
198        return await AbstractAuth._raise_for_status(response)
199
200    @classmethod
201    async def _raise_for_status(
202        cls, resp: aiohttp.ClientResponse
203    ) -> aiohttp.ClientResponse:
204        """Raise exceptions on failure methods."""
205        error_detail = await cls._error_detail(resp)
206        try:
207            resp.raise_for_status()
208        except aiohttp.ClientResponseError as err:
209            error_message = f"{err.message} response from API ({resp.status})"
210            if error_detail:
211                error_message += f": {error_detail}"
212            if err.status == HTTPStatus.FORBIDDEN:
213                raise ApiForbiddenException(error_message)
214            if err.status == HTTPStatus.UNAUTHORIZED:
215                raise AuthException(error_message)
216            if err.status == HTTPStatus.NOT_FOUND:
217                raise NotFoundException(error_message)
218            raise ApiException(error_message) from err
219        except aiohttp.ClientError as err:
220            raise ApiException(f"Error from API: {err}") from err
221        return resp
222
223    @classmethod
224    async def _error_detail(cls, resp: aiohttp.ClientResponse) -> Error | None:
225        """Returns an error message string from the APi response."""
226        if resp.status < 400:
227            return None
228        try:
229            result = await resp.text()
230        except ClientError:
231            return None
232        try:
233            error_response = ErrorResponse.from_json(result)
234        except (LookupError, ValueError):
235            return None
236        return error_response.error

Abstract class to make authenticated requests.

AbstractAuth(websession: aiohttp.client.ClientSession, host: str)
101    def __init__(self, websession: aiohttp.ClientSession, host: str):
102        """Initialize the AbstractAuth."""
103        self._websession = websession
104        self._host = host

Initialize the AbstractAuth.

@abstractmethod
async def async_get_access_token(self) -> str:
106    @abstractmethod
107    async def async_get_access_token(self) -> str:
108        """Return a valid access token."""

Return a valid access token.

async def async_get_creds(self) -> google.auth.credentials.Credentials:
110    async def async_get_creds(self) -> Credentials:
111        """Return creds for subscriber API."""
112        token = await self.async_get_access_token()
113        return OAuthCredentials(token=token)  # type: ignore[no-untyped-call]

Return creds for subscriber API.

async def request( self, method: str, url: str, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
115    async def request(
116        self,
117        method: str,
118        url: str,
119        **kwargs: Any,
120    ) -> aiohttp.ClientResponse:
121        """Make a request."""
122        headers = kwargs.get("headers")
123
124        if headers is None:
125            headers = {}
126        else:
127            headers = dict(headers)
128            del kwargs["headers"]
129        if AUTHORIZATION_HEADER not in headers:
130            try:
131                access_token = await self.async_get_access_token()
132            except TimeoutError as err:
133                raise ApiTimeoutException(
134                    f"Timeout requesting API token: {err}"
135                ) from err
136            except ClientError as err:
137                raise AuthException(f"Access token failure: {err}") from err
138            headers[AUTHORIZATION_HEADER] = f"Bearer {access_token}"
139        if not (url.startswith("http://") or url.startswith("https://")):
140            url = f"{self._host}/{url}"
141        _LOGGER.debug("request[%s]=%s", method, url)
142        if method == "post" and "json" in kwargs:
143            _LOGGER.debug("request[post json]=%s", kwargs["json"])
144        try:
145            return await self._request(method, url, headers=headers, **kwargs)
146        except TimeoutError as err:
147            raise ApiTimeoutException(f"Timeout connecting to API: {err}") from err
148        except ClientError as err:
149            raise ApiException(f"Error connecting to API: {err}") from err

Make a request.

async def get(self, url: str, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
156    async def get(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
157        """Make a get request."""
158        response = await self.request("get", url, **kwargs)
159        return await AbstractAuth._raise_for_status(response)

Make a get request.

async def get_json(self, url: str, **kwargs: Any) -> dict[str, typing.Any]:
161    async def get_json(self, url: str, **kwargs: Any) -> dict[str, Any]:
162        """Make a get request and return json response."""
163        resp = await self.get(url, **kwargs)
164        try:
165            result = await resp.json()
166        except ClientError as err:
167            raise ApiException("Server returned malformed response") from err
168        if not isinstance(result, dict):
169            raise ApiException("Server return malformed response: %s" % result)
170        _LOGGER.debug("response=%s", result)
171        return result

Make a get request and return json response.

async def post(self, url: str, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
173    async def post(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
174        """Make a post request."""
175        response = await self.request("post", url, **kwargs)
176        return await AbstractAuth._raise_for_status(response)

Make a post request.

async def post_json(self, url: str, **kwargs: Any) -> dict[str, typing.Any]:
178    async def post_json(self, url: str, **kwargs: Any) -> dict[str, Any]:
179        """Make a post request and return a json response."""
180        resp = await self.post(url, **kwargs)
181        try:
182            result = await resp.json()
183        except ClientError as err:
184            raise ApiException("Server returned malformed response") from err
185        if not isinstance(result, dict):
186            raise ApiException("Server returned malformed response: %s" % result)
187        _LOGGER.debug("response=%s", result)
188        return result

Make a post request and return a json response.

async def put(self, url: str, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
190    async def put(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
191        """Make a put request."""
192        response = await self.request("put", url, **kwargs)
193        return await AbstractAuth._raise_for_status(response)

Make a put request.

async def delete(self, url: str, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
195    async def delete(self, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
196        """Make a delete request."""
197        response = await self.request("delete", url, **kwargs)
198        return await AbstractAuth._raise_for_status(response)

Make a delete request.