google_nest_sdm.google_nest_subscriber

Subscriber for the Smart Device Management event based API.

  1"""Subscriber for the Smart Device Management event based API."""
  2
  3from __future__ import annotations
  4
  5import datetime
  6import asyncio
  7import enum
  8import logging
  9import re
 10import time
 11from typing import Awaitable, Callable
 12
 13
 14from .auth import AbstractAuth
 15from .device_manager import DeviceManager
 16from .diagnostics import SUBSCRIBER_DIAGNOSTICS as DIAGNOSTICS
 17from .event import EventMessage
 18from .event_media import CachePolicy
 19from .exceptions import (
 20    ConfigurationException,
 21)
 22from .google_nest_api import GoogleNestAPI
 23from .streaming_manager import StreamingManager, Message
 24
 25__all__ = [
 26    "GoogleNestSubscriber",
 27    "ApiEnv",
 28]
 29
 30
 31_LOGGER = logging.getLogger(__name__)
 32
 33# Used to catch invalid subscriber id
 34EXPECTED_SUBSCRIBER_REGEXP = re.compile("projects/.*/subscriptions/.*")
 35
 36MESSAGE_ACK_TIMEOUT_SECONDS = 30.0
 37
 38# Detect changes to devices every 12 hours
 39BACKGROUND_REFRESH_SECONDS = datetime.timedelta(hours=12).total_seconds()
 40
 41# Note: Users of non-prod instances will have to manually configure a topic
 42TOPIC_FORMAT = "projects/sdm-prod/topics/enterprise-{project_id}"
 43
 44OAUTH2_AUTHORIZE_FORMAT = (
 45    "https://nestservices.google.com/partnerconnections/{project_id}/auth"
 46)
 47OAUTH2_TOKEN = "https://www.googleapis.com/oauth2/v4/token"
 48SDM_SCOPES = [
 49    "https://www.googleapis.com/auth/sdm.service",
 50    "https://www.googleapis.com/auth/pubsub",
 51]
 52API_URL = "https://smartdevicemanagement.googleapis.com/v1"
 53
 54
 55class ApiEnv(enum.Enum):
 56    PROD = (OAUTH2_AUTHORIZE_FORMAT, API_URL)
 57    PREPROD = (
 58        "https://sdmresourcepicker-preprod.sandbox.google.com/partnerconnections/{project_id}/auth",
 59        "https://preprod-smartdevicemanagement.googleapis.com/v1",
 60    )
 61
 62    def __init__(self, authorize_url: str, api_url: str) -> None:
 63        """Init ApiEnv."""
 64        self._authorize_url = authorize_url
 65        self._api_url = api_url
 66
 67    @property
 68    def authorize_url_format(self) -> str:
 69        """OAuth Authorize url format string."""
 70        return self._authorize_url
 71
 72    @property
 73    def api_url(self) -> str:
 74        """API url."""
 75        return self._api_url
 76
 77
 78def get_api_env(env: str | None) -> ApiEnv:
 79    """Create an ApiEnv from a string."""
 80    if env is None or env == "prod":
 81        return ApiEnv.PROD
 82    if env == "preprod":
 83        return ApiEnv.PREPROD
 84    raise ValueError("Invalid ApiEnv: %s" % env)
 85
 86
 87def _validate_subscription_name(subscription_name: str) -> None:
 88    """Validates that a subscription name is correct.
 89
 90    Raises ConfigurationException on failure.
 91    """
 92    if not EXPECTED_SUBSCRIBER_REGEXP.match(subscription_name):
 93        DIAGNOSTICS.increment("subscription_name_invalid")
 94        _LOGGER.debug("Subscription name did not match pattern: %s", subscription_name)
 95        raise ConfigurationException(
 96            "Subscription misconfigured. Expected subscriber_id to "
 97            f"match '{EXPECTED_SUBSCRIBER_REGEXP.pattern}' but was "
 98            f"'{subscription_name}'"
 99        )
100
101
102class GoogleNestSubscriber:
103    """Subscribe to events from the Google Nest feed."""
104
105    def __init__(
106        self,
107        auth: AbstractAuth,
108        project_id: str,
109        subscription_name: str,
110    ) -> None:
111        """Initialize the subscriber for the specified topic."""
112        self._auth = auth
113        self._subscription_name = subscription_name
114        self._project_id = project_id
115        self._api = GoogleNestAPI(auth, project_id)
116        self._device_manager_task: asyncio.Task[DeviceManager] | None = None
117        self._refresh_task: asyncio.Task[None] | None = None
118        self._callback: Callable[[EventMessage], Awaitable[None]] | None = None
119        self._cache_policy = CachePolicy()
120
121    @property
122    def subscription_name(self) -> str:
123        """Return the configured subscriber name."""
124        return self._subscription_name
125
126    @property
127    def project_id(self) -> str:
128        """Return the configured SDM project_id."""
129        return self._project_id
130
131    def set_update_callback(
132        self, target: Callable[[EventMessage], Awaitable[None]]
133    ) -> None:
134        """Register a callback invoked when new messages are received.
135
136        If the event is associated with media, then the callback will only
137        be invoked once the media has been fetched.
138        """
139        self._callback = target
140        if self._device_manager_task and self._device_manager_task.done():
141            self._device_manager_task.result().set_update_callback(target)
142
143    async def start_async(self) -> Callable[[], None]:
144        """Start the subscription.
145
146        Returns a callable used to stop/cancel the subscription. Received
147        messages are passed to the callback provided to `set_update_callback`.
148        """
149        _validate_subscription_name(self._subscription_name)
150        _LOGGER.debug("Starting subscription %s", self._subscription_name)
151        DIAGNOSTICS.increment("start")
152
153        stream = StreamingManager(
154            auth=self._auth,
155            subscription_name=self._subscription_name,
156            callback=self._async_message_callback_with_timeout,
157        )
158        await stream.start()
159
160        def stop_subscription() -> None:
161            if self._refresh_task:
162                self._refresh_task.cancel()
163            stream.stop()
164
165        return stop_subscription
166
167    @property
168    def cache_policy(self) -> CachePolicy:
169        """Return cache policy shared by device EventMediaManager objects."""
170        return self._cache_policy
171
172    async def async_get_device_manager(self) -> DeviceManager:
173        """Return the DeviceManger with the current state of devices."""
174        if not self._device_manager_task:
175            self._device_manager_task = asyncio.create_task(
176                self._async_create_device_manager()
177            )
178        device_manager = await self._device_manager_task
179        if not self._refresh_task:
180            self._refresh_task = asyncio.create_task(
181                self._async_run_refresh(device_manager)
182            )
183        return device_manager
184
185    async def _async_create_device_manager(self) -> DeviceManager:
186        """Create a DeviceManager, populated with initial state."""
187        device_manager = DeviceManager(self._api, self._cache_policy)
188        await device_manager.async_refresh()
189        if self._callback:
190            device_manager.set_update_callback(self._callback)
191        return device_manager
192
193    async def _async_message_callback_with_timeout(self, message: Message) -> None:
194        """Handle a received message."""
195        try:
196            async with asyncio.timeout(MESSAGE_ACK_TIMEOUT_SECONDS):
197                await self._async_message_callback(message)
198        except TimeoutError as err:
199            DIAGNOSTICS.increment("message_ack_timeout")
200            raise TimeoutError("Message ack timeout processing message") from err
201
202    async def _async_message_callback(self, message: Message) -> None:
203        """Handle a received message."""
204        event = EventMessage.create_event(message.payload, self._auth)
205        recv = time.time()
206        latency_ms = int((recv - event.timestamp.timestamp()) * 1000)
207        DIAGNOSTICS.elapsed("message_received", latency_ms)
208        # Only accept device events once the Device Manager has been loaded.
209        # We are ok with missing messages on startup since the device manager
210        # will do a live read. This checks for an exception to avoid throwing
211        # inside the pubsub callback and further wedging the pubsub client library.
212        if (
213            self._device_manager_task
214            and self._device_manager_task.done()
215            and not self._device_manager_task.exception()
216        ):
217            device_manager = self._device_manager_task.result()
218            await device_manager.async_handle_event(event)
219
220        process_latency_ms = int((time.time() - recv) * 1000)
221        DIAGNOSTICS.elapsed("message_processed", process_latency_ms)
222
223    async def _async_run_refresh(self, device_manager: DeviceManager) -> None:
224        """Run a background refresh of devices."""
225        while True:
226            try:
227                await asyncio.sleep(BACKGROUND_REFRESH_SECONDS)
228                _LOGGER.debug("Refreshing devices")
229                await device_manager.async_refresh()
230            except asyncio.CancelledError:
231                return
232            except Exception:  # pylint: disable=broad-except-clause
233                _LOGGER.exception("Unexpected error during device refresh")
class GoogleNestSubscriber:
103class GoogleNestSubscriber:
104    """Subscribe to events from the Google Nest feed."""
105
106    def __init__(
107        self,
108        auth: AbstractAuth,
109        project_id: str,
110        subscription_name: str,
111    ) -> None:
112        """Initialize the subscriber for the specified topic."""
113        self._auth = auth
114        self._subscription_name = subscription_name
115        self._project_id = project_id
116        self._api = GoogleNestAPI(auth, project_id)
117        self._device_manager_task: asyncio.Task[DeviceManager] | None = None
118        self._refresh_task: asyncio.Task[None] | None = None
119        self._callback: Callable[[EventMessage], Awaitable[None]] | None = None
120        self._cache_policy = CachePolicy()
121
122    @property
123    def subscription_name(self) -> str:
124        """Return the configured subscriber name."""
125        return self._subscription_name
126
127    @property
128    def project_id(self) -> str:
129        """Return the configured SDM project_id."""
130        return self._project_id
131
132    def set_update_callback(
133        self, target: Callable[[EventMessage], Awaitable[None]]
134    ) -> None:
135        """Register a callback invoked when new messages are received.
136
137        If the event is associated with media, then the callback will only
138        be invoked once the media has been fetched.
139        """
140        self._callback = target
141        if self._device_manager_task and self._device_manager_task.done():
142            self._device_manager_task.result().set_update_callback(target)
143
144    async def start_async(self) -> Callable[[], None]:
145        """Start the subscription.
146
147        Returns a callable used to stop/cancel the subscription. Received
148        messages are passed to the callback provided to `set_update_callback`.
149        """
150        _validate_subscription_name(self._subscription_name)
151        _LOGGER.debug("Starting subscription %s", self._subscription_name)
152        DIAGNOSTICS.increment("start")
153
154        stream = StreamingManager(
155            auth=self._auth,
156            subscription_name=self._subscription_name,
157            callback=self._async_message_callback_with_timeout,
158        )
159        await stream.start()
160
161        def stop_subscription() -> None:
162            if self._refresh_task:
163                self._refresh_task.cancel()
164            stream.stop()
165
166        return stop_subscription
167
168    @property
169    def cache_policy(self) -> CachePolicy:
170        """Return cache policy shared by device EventMediaManager objects."""
171        return self._cache_policy
172
173    async def async_get_device_manager(self) -> DeviceManager:
174        """Return the DeviceManger with the current state of devices."""
175        if not self._device_manager_task:
176            self._device_manager_task = asyncio.create_task(
177                self._async_create_device_manager()
178            )
179        device_manager = await self._device_manager_task
180        if not self._refresh_task:
181            self._refresh_task = asyncio.create_task(
182                self._async_run_refresh(device_manager)
183            )
184        return device_manager
185
186    async def _async_create_device_manager(self) -> DeviceManager:
187        """Create a DeviceManager, populated with initial state."""
188        device_manager = DeviceManager(self._api, self._cache_policy)
189        await device_manager.async_refresh()
190        if self._callback:
191            device_manager.set_update_callback(self._callback)
192        return device_manager
193
194    async def _async_message_callback_with_timeout(self, message: Message) -> None:
195        """Handle a received message."""
196        try:
197            async with asyncio.timeout(MESSAGE_ACK_TIMEOUT_SECONDS):
198                await self._async_message_callback(message)
199        except TimeoutError as err:
200            DIAGNOSTICS.increment("message_ack_timeout")
201            raise TimeoutError("Message ack timeout processing message") from err
202
203    async def _async_message_callback(self, message: Message) -> None:
204        """Handle a received message."""
205        event = EventMessage.create_event(message.payload, self._auth)
206        recv = time.time()
207        latency_ms = int((recv - event.timestamp.timestamp()) * 1000)
208        DIAGNOSTICS.elapsed("message_received", latency_ms)
209        # Only accept device events once the Device Manager has been loaded.
210        # We are ok with missing messages on startup since the device manager
211        # will do a live read. This checks for an exception to avoid throwing
212        # inside the pubsub callback and further wedging the pubsub client library.
213        if (
214            self._device_manager_task
215            and self._device_manager_task.done()
216            and not self._device_manager_task.exception()
217        ):
218            device_manager = self._device_manager_task.result()
219            await device_manager.async_handle_event(event)
220
221        process_latency_ms = int((time.time() - recv) * 1000)
222        DIAGNOSTICS.elapsed("message_processed", process_latency_ms)
223
224    async def _async_run_refresh(self, device_manager: DeviceManager) -> None:
225        """Run a background refresh of devices."""
226        while True:
227            try:
228                await asyncio.sleep(BACKGROUND_REFRESH_SECONDS)
229                _LOGGER.debug("Refreshing devices")
230                await device_manager.async_refresh()
231            except asyncio.CancelledError:
232                return
233            except Exception:  # pylint: disable=broad-except-clause
234                _LOGGER.exception("Unexpected error during device refresh")

Subscribe to events from the Google Nest feed.

GoogleNestSubscriber( auth: google_nest_sdm.auth.AbstractAuth, project_id: str, subscription_name: str)
106    def __init__(
107        self,
108        auth: AbstractAuth,
109        project_id: str,
110        subscription_name: str,
111    ) -> None:
112        """Initialize the subscriber for the specified topic."""
113        self._auth = auth
114        self._subscription_name = subscription_name
115        self._project_id = project_id
116        self._api = GoogleNestAPI(auth, project_id)
117        self._device_manager_task: asyncio.Task[DeviceManager] | None = None
118        self._refresh_task: asyncio.Task[None] | None = None
119        self._callback: Callable[[EventMessage], Awaitable[None]] | None = None
120        self._cache_policy = CachePolicy()

Initialize the subscriber for the specified topic.

subscription_name: str
122    @property
123    def subscription_name(self) -> str:
124        """Return the configured subscriber name."""
125        return self._subscription_name

Return the configured subscriber name.

project_id: str
127    @property
128    def project_id(self) -> str:
129        """Return the configured SDM project_id."""
130        return self._project_id

Return the configured SDM project_id.

def set_update_callback( self, target: Callable[[google_nest_sdm.event.EventMessage], Awaitable[NoneType]]) -> None:
132    def set_update_callback(
133        self, target: Callable[[EventMessage], Awaitable[None]]
134    ) -> None:
135        """Register a callback invoked when new messages are received.
136
137        If the event is associated with media, then the callback will only
138        be invoked once the media has been fetched.
139        """
140        self._callback = target
141        if self._device_manager_task and self._device_manager_task.done():
142            self._device_manager_task.result().set_update_callback(target)

Register a callback invoked when new messages are received.

If the event is associated with media, then the callback will only be invoked once the media has been fetched.

async def start_async(self) -> Callable[[], NoneType]:
144    async def start_async(self) -> Callable[[], None]:
145        """Start the subscription.
146
147        Returns a callable used to stop/cancel the subscription. Received
148        messages are passed to the callback provided to `set_update_callback`.
149        """
150        _validate_subscription_name(self._subscription_name)
151        _LOGGER.debug("Starting subscription %s", self._subscription_name)
152        DIAGNOSTICS.increment("start")
153
154        stream = StreamingManager(
155            auth=self._auth,
156            subscription_name=self._subscription_name,
157            callback=self._async_message_callback_with_timeout,
158        )
159        await stream.start()
160
161        def stop_subscription() -> None:
162            if self._refresh_task:
163                self._refresh_task.cancel()
164            stream.stop()
165
166        return stop_subscription

Start the subscription.

Returns a callable used to stop/cancel the subscription. Received messages are passed to the callback provided to set_update_callback.

cache_policy: google_nest_sdm.event_media.CachePolicy
168    @property
169    def cache_policy(self) -> CachePolicy:
170        """Return cache policy shared by device EventMediaManager objects."""
171        return self._cache_policy

Return cache policy shared by device EventMediaManager objects.

async def async_get_device_manager(self) -> google_nest_sdm.device_manager.DeviceManager:
173    async def async_get_device_manager(self) -> DeviceManager:
174        """Return the DeviceManger with the current state of devices."""
175        if not self._device_manager_task:
176            self._device_manager_task = asyncio.create_task(
177                self._async_create_device_manager()
178            )
179        device_manager = await self._device_manager_task
180        if not self._refresh_task:
181            self._refresh_task = asyncio.create_task(
182                self._async_run_refresh(device_manager)
183            )
184        return device_manager

Return the DeviceManger with the current state of devices.

class ApiEnv(enum.Enum):
56class ApiEnv(enum.Enum):
57    PROD = (OAUTH2_AUTHORIZE_FORMAT, API_URL)
58    PREPROD = (
59        "https://sdmresourcepicker-preprod.sandbox.google.com/partnerconnections/{project_id}/auth",
60        "https://preprod-smartdevicemanagement.googleapis.com/v1",
61    )
62
63    def __init__(self, authorize_url: str, api_url: str) -> None:
64        """Init ApiEnv."""
65        self._authorize_url = authorize_url
66        self._api_url = api_url
67
68    @property
69    def authorize_url_format(self) -> str:
70        """OAuth Authorize url format string."""
71        return self._authorize_url
72
73    @property
74    def api_url(self) -> str:
75        """API url."""
76        return self._api_url
ApiEnv(authorize_url: str, api_url: str)
63    def __init__(self, authorize_url: str, api_url: str) -> None:
64        """Init ApiEnv."""
65        self._authorize_url = authorize_url
66        self._api_url = api_url

Init ApiEnv.

PROD = <ApiEnv.PROD: ('https://nestservices.google.com/partnerconnections/{project_id}/auth', 'https://smartdevicemanagement.googleapis.com/v1')>
PREPROD = <ApiEnv.PREPROD: ('https://sdmresourcepicker-preprod.sandbox.google.com/partnerconnections/{project_id}/auth', 'https://preprod-smartdevicemanagement.googleapis.com/v1')>
authorize_url_format: str
68    @property
69    def authorize_url_format(self) -> str:
70        """OAuth Authorize url format string."""
71        return self._authorize_url

OAuth Authorize url format string.

api_url: str
73    @property
74    def api_url(self) -> str:
75        """API url."""
76        return self._api_url

API url.