google_nest_sdm.google_nest_subscriber
Subscriber for the Smart Device Management event based API.
1"""Subscriber for the Smart Device Management event based API.""" 2 3from __future__ import annotations 4 5import datetime 6import asyncio 7import enum 8import logging 9import re 10import time 11from typing import Awaitable, Callable 12 13 14from .auth import AbstractAuth 15from .device_manager import DeviceManager 16from .diagnostics import SUBSCRIBER_DIAGNOSTICS as DIAGNOSTICS 17from .event import EventMessage 18from .event_media import CachePolicy 19from .exceptions import ( 20 ConfigurationException, 21) 22from .google_nest_api import GoogleNestAPI 23from .streaming_manager import StreamingManager, Message 24 25__all__ = [ 26 "GoogleNestSubscriber", 27 "ApiEnv", 28] 29 30 31_LOGGER = logging.getLogger(__name__) 32 33# Used to catch invalid subscriber id 34EXPECTED_SUBSCRIBER_REGEXP = re.compile("projects/.*/subscriptions/.*") 35 36MESSAGE_ACK_TIMEOUT_SECONDS = 30.0 37 38# Detect changes to devices every 12 hours 39BACKGROUND_REFRESH_SECONDS = datetime.timedelta(hours=12).total_seconds() 40 41# Note: Users of non-prod instances will have to manually configure a topic 42TOPIC_FORMAT = "projects/sdm-prod/topics/enterprise-{project_id}" 43 44OAUTH2_AUTHORIZE_FORMAT = ( 45 "https://nestservices.google.com/partnerconnections/{project_id}/auth" 46) 47OAUTH2_TOKEN = "https://www.googleapis.com/oauth2/v4/token" 48SDM_SCOPES = [ 49 "https://www.googleapis.com/auth/sdm.service", 50 "https://www.googleapis.com/auth/pubsub", 51] 52API_URL = "https://smartdevicemanagement.googleapis.com/v1" 53 54 55class ApiEnv(enum.Enum): 56 PROD = (OAUTH2_AUTHORIZE_FORMAT, API_URL) 57 PREPROD = ( 58 "https://sdmresourcepicker-preprod.sandbox.google.com/partnerconnections/{project_id}/auth", 59 "https://preprod-smartdevicemanagement.googleapis.com/v1", 60 ) 61 62 def __init__(self, authorize_url: str, api_url: str) -> None: 63 """Init ApiEnv.""" 64 self._authorize_url = authorize_url 65 self._api_url = api_url 66 67 @property 68 def authorize_url_format(self) -> str: 69 """OAuth Authorize url format string.""" 70 return self._authorize_url 71 72 @property 73 def api_url(self) -> str: 74 """API url.""" 75 return self._api_url 76 77 78def get_api_env(env: str | None) -> ApiEnv: 79 """Create an ApiEnv from a string.""" 80 if env is None or env == "prod": 81 return ApiEnv.PROD 82 if env == "preprod": 83 return ApiEnv.PREPROD 84 raise ValueError("Invalid ApiEnv: %s" % env) 85 86 87def _validate_subscription_name(subscription_name: str) -> None: 88 """Validates that a subscription name is correct. 89 90 Raises ConfigurationException on failure. 91 """ 92 if not EXPECTED_SUBSCRIBER_REGEXP.match(subscription_name): 93 DIAGNOSTICS.increment("subscription_name_invalid") 94 _LOGGER.debug("Subscription name did not match pattern: %s", subscription_name) 95 raise ConfigurationException( 96 "Subscription misconfigured. Expected subscriber_id to " 97 f"match '{EXPECTED_SUBSCRIBER_REGEXP.pattern}' but was " 98 f"'{subscription_name}'" 99 ) 100 101 102class GoogleNestSubscriber: 103 """Subscribe to events from the Google Nest feed.""" 104 105 def __init__( 106 self, 107 auth: AbstractAuth, 108 project_id: str, 109 subscription_name: str, 110 ) -> None: 111 """Initialize the subscriber for the specified topic.""" 112 self._auth = auth 113 self._subscription_name = subscription_name 114 self._project_id = project_id 115 self._api = GoogleNestAPI(auth, project_id) 116 self._device_manager_task: asyncio.Task[DeviceManager] | None = None 117 self._refresh_task: asyncio.Task[None] | None = None 118 self._callback: Callable[[EventMessage], Awaitable[None]] | None = None 119 self._cache_policy = CachePolicy() 120 121 @property 122 def subscription_name(self) -> str: 123 """Return the configured subscriber name.""" 124 return self._subscription_name 125 126 @property 127 def project_id(self) -> str: 128 """Return the configured SDM project_id.""" 129 return self._project_id 130 131 def set_update_callback( 132 self, target: Callable[[EventMessage], Awaitable[None]] 133 ) -> None: 134 """Register a callback invoked when new messages are received. 135 136 If the event is associated with media, then the callback will only 137 be invoked once the media has been fetched. 138 """ 139 self._callback = target 140 if self._device_manager_task and self._device_manager_task.done(): 141 self._device_manager_task.result().set_update_callback(target) 142 143 async def start_async(self) -> Callable[[], None]: 144 """Start the subscription. 145 146 Returns a callable used to stop/cancel the subscription. Received 147 messages are passed to the callback provided to `set_update_callback`. 148 """ 149 _validate_subscription_name(self._subscription_name) 150 _LOGGER.debug("Starting subscription %s", self._subscription_name) 151 DIAGNOSTICS.increment("start") 152 153 stream = StreamingManager( 154 auth=self._auth, 155 subscription_name=self._subscription_name, 156 callback=self._async_message_callback_with_timeout, 157 ) 158 await stream.start() 159 160 def stop_subscription() -> None: 161 if self._refresh_task: 162 self._refresh_task.cancel() 163 stream.stop() 164 165 return stop_subscription 166 167 @property 168 def cache_policy(self) -> CachePolicy: 169 """Return cache policy shared by device EventMediaManager objects.""" 170 return self._cache_policy 171 172 async def async_get_device_manager(self) -> DeviceManager: 173 """Return the DeviceManger with the current state of devices.""" 174 if not self._device_manager_task: 175 self._device_manager_task = asyncio.create_task( 176 self._async_create_device_manager() 177 ) 178 device_manager = await self._device_manager_task 179 if not self._refresh_task: 180 self._refresh_task = asyncio.create_task( 181 self._async_run_refresh(device_manager) 182 ) 183 return device_manager 184 185 async def _async_create_device_manager(self) -> DeviceManager: 186 """Create a DeviceManager, populated with initial state.""" 187 device_manager = DeviceManager(self._api, self._cache_policy) 188 await device_manager.async_refresh() 189 if self._callback: 190 device_manager.set_update_callback(self._callback) 191 return device_manager 192 193 async def _async_message_callback_with_timeout(self, message: Message) -> None: 194 """Handle a received message.""" 195 try: 196 async with asyncio.timeout(MESSAGE_ACK_TIMEOUT_SECONDS): 197 await self._async_message_callback(message) 198 except TimeoutError as err: 199 DIAGNOSTICS.increment("message_ack_timeout") 200 raise TimeoutError("Message ack timeout processing message") from err 201 202 async def _async_message_callback(self, message: Message) -> None: 203 """Handle a received message.""" 204 event = EventMessage.create_event(message.payload, self._auth) 205 recv = time.time() 206 latency_ms = int((recv - event.timestamp.timestamp()) * 1000) 207 DIAGNOSTICS.elapsed("message_received", latency_ms) 208 # Only accept device events once the Device Manager has been loaded. 209 # We are ok with missing messages on startup since the device manager 210 # will do a live read. This checks for an exception to avoid throwing 211 # inside the pubsub callback and further wedging the pubsub client library. 212 if ( 213 self._device_manager_task 214 and self._device_manager_task.done() 215 and not self._device_manager_task.exception() 216 ): 217 device_manager = self._device_manager_task.result() 218 await device_manager.async_handle_event(event) 219 220 process_latency_ms = int((time.time() - recv) * 1000) 221 DIAGNOSTICS.elapsed("message_processed", process_latency_ms) 222 223 async def _async_run_refresh(self, device_manager: DeviceManager) -> None: 224 """Run a background refresh of devices.""" 225 while True: 226 try: 227 await asyncio.sleep(BACKGROUND_REFRESH_SECONDS) 228 _LOGGER.debug("Refreshing devices") 229 await device_manager.async_refresh() 230 except asyncio.CancelledError: 231 return 232 except Exception: # pylint: disable=broad-except-clause 233 _LOGGER.exception("Unexpected error during device refresh")
class
GoogleNestSubscriber:
103class GoogleNestSubscriber: 104 """Subscribe to events from the Google Nest feed.""" 105 106 def __init__( 107 self, 108 auth: AbstractAuth, 109 project_id: str, 110 subscription_name: str, 111 ) -> None: 112 """Initialize the subscriber for the specified topic.""" 113 self._auth = auth 114 self._subscription_name = subscription_name 115 self._project_id = project_id 116 self._api = GoogleNestAPI(auth, project_id) 117 self._device_manager_task: asyncio.Task[DeviceManager] | None = None 118 self._refresh_task: asyncio.Task[None] | None = None 119 self._callback: Callable[[EventMessage], Awaitable[None]] | None = None 120 self._cache_policy = CachePolicy() 121 122 @property 123 def subscription_name(self) -> str: 124 """Return the configured subscriber name.""" 125 return self._subscription_name 126 127 @property 128 def project_id(self) -> str: 129 """Return the configured SDM project_id.""" 130 return self._project_id 131 132 def set_update_callback( 133 self, target: Callable[[EventMessage], Awaitable[None]] 134 ) -> None: 135 """Register a callback invoked when new messages are received. 136 137 If the event is associated with media, then the callback will only 138 be invoked once the media has been fetched. 139 """ 140 self._callback = target 141 if self._device_manager_task and self._device_manager_task.done(): 142 self._device_manager_task.result().set_update_callback(target) 143 144 async def start_async(self) -> Callable[[], None]: 145 """Start the subscription. 146 147 Returns a callable used to stop/cancel the subscription. Received 148 messages are passed to the callback provided to `set_update_callback`. 149 """ 150 _validate_subscription_name(self._subscription_name) 151 _LOGGER.debug("Starting subscription %s", self._subscription_name) 152 DIAGNOSTICS.increment("start") 153 154 stream = StreamingManager( 155 auth=self._auth, 156 subscription_name=self._subscription_name, 157 callback=self._async_message_callback_with_timeout, 158 ) 159 await stream.start() 160 161 def stop_subscription() -> None: 162 if self._refresh_task: 163 self._refresh_task.cancel() 164 stream.stop() 165 166 return stop_subscription 167 168 @property 169 def cache_policy(self) -> CachePolicy: 170 """Return cache policy shared by device EventMediaManager objects.""" 171 return self._cache_policy 172 173 async def async_get_device_manager(self) -> DeviceManager: 174 """Return the DeviceManger with the current state of devices.""" 175 if not self._device_manager_task: 176 self._device_manager_task = asyncio.create_task( 177 self._async_create_device_manager() 178 ) 179 device_manager = await self._device_manager_task 180 if not self._refresh_task: 181 self._refresh_task = asyncio.create_task( 182 self._async_run_refresh(device_manager) 183 ) 184 return device_manager 185 186 async def _async_create_device_manager(self) -> DeviceManager: 187 """Create a DeviceManager, populated with initial state.""" 188 device_manager = DeviceManager(self._api, self._cache_policy) 189 await device_manager.async_refresh() 190 if self._callback: 191 device_manager.set_update_callback(self._callback) 192 return device_manager 193 194 async def _async_message_callback_with_timeout(self, message: Message) -> None: 195 """Handle a received message.""" 196 try: 197 async with asyncio.timeout(MESSAGE_ACK_TIMEOUT_SECONDS): 198 await self._async_message_callback(message) 199 except TimeoutError as err: 200 DIAGNOSTICS.increment("message_ack_timeout") 201 raise TimeoutError("Message ack timeout processing message") from err 202 203 async def _async_message_callback(self, message: Message) -> None: 204 """Handle a received message.""" 205 event = EventMessage.create_event(message.payload, self._auth) 206 recv = time.time() 207 latency_ms = int((recv - event.timestamp.timestamp()) * 1000) 208 DIAGNOSTICS.elapsed("message_received", latency_ms) 209 # Only accept device events once the Device Manager has been loaded. 210 # We are ok with missing messages on startup since the device manager 211 # will do a live read. This checks for an exception to avoid throwing 212 # inside the pubsub callback and further wedging the pubsub client library. 213 if ( 214 self._device_manager_task 215 and self._device_manager_task.done() 216 and not self._device_manager_task.exception() 217 ): 218 device_manager = self._device_manager_task.result() 219 await device_manager.async_handle_event(event) 220 221 process_latency_ms = int((time.time() - recv) * 1000) 222 DIAGNOSTICS.elapsed("message_processed", process_latency_ms) 223 224 async def _async_run_refresh(self, device_manager: DeviceManager) -> None: 225 """Run a background refresh of devices.""" 226 while True: 227 try: 228 await asyncio.sleep(BACKGROUND_REFRESH_SECONDS) 229 _LOGGER.debug("Refreshing devices") 230 await device_manager.async_refresh() 231 except asyncio.CancelledError: 232 return 233 except Exception: # pylint: disable=broad-except-clause 234 _LOGGER.exception("Unexpected error during device refresh")
Subscribe to events from the Google Nest feed.
GoogleNestSubscriber( auth: google_nest_sdm.auth.AbstractAuth, project_id: str, subscription_name: str)
106 def __init__( 107 self, 108 auth: AbstractAuth, 109 project_id: str, 110 subscription_name: str, 111 ) -> None: 112 """Initialize the subscriber for the specified topic.""" 113 self._auth = auth 114 self._subscription_name = subscription_name 115 self._project_id = project_id 116 self._api = GoogleNestAPI(auth, project_id) 117 self._device_manager_task: asyncio.Task[DeviceManager] | None = None 118 self._refresh_task: asyncio.Task[None] | None = None 119 self._callback: Callable[[EventMessage], Awaitable[None]] | None = None 120 self._cache_policy = CachePolicy()
Initialize the subscriber for the specified topic.
subscription_name: str
122 @property 123 def subscription_name(self) -> str: 124 """Return the configured subscriber name.""" 125 return self._subscription_name
Return the configured subscriber name.
project_id: str
127 @property 128 def project_id(self) -> str: 129 """Return the configured SDM project_id.""" 130 return self._project_id
Return the configured SDM project_id.
def
set_update_callback( self, target: Callable[[google_nest_sdm.event.EventMessage], Awaitable[NoneType]]) -> None:
132 def set_update_callback( 133 self, target: Callable[[EventMessage], Awaitable[None]] 134 ) -> None: 135 """Register a callback invoked when new messages are received. 136 137 If the event is associated with media, then the callback will only 138 be invoked once the media has been fetched. 139 """ 140 self._callback = target 141 if self._device_manager_task and self._device_manager_task.done(): 142 self._device_manager_task.result().set_update_callback(target)
Register a callback invoked when new messages are received.
If the event is associated with media, then the callback will only be invoked once the media has been fetched.
async def
start_async(self) -> Callable[[], NoneType]:
144 async def start_async(self) -> Callable[[], None]: 145 """Start the subscription. 146 147 Returns a callable used to stop/cancel the subscription. Received 148 messages are passed to the callback provided to `set_update_callback`. 149 """ 150 _validate_subscription_name(self._subscription_name) 151 _LOGGER.debug("Starting subscription %s", self._subscription_name) 152 DIAGNOSTICS.increment("start") 153 154 stream = StreamingManager( 155 auth=self._auth, 156 subscription_name=self._subscription_name, 157 callback=self._async_message_callback_with_timeout, 158 ) 159 await stream.start() 160 161 def stop_subscription() -> None: 162 if self._refresh_task: 163 self._refresh_task.cancel() 164 stream.stop() 165 166 return stop_subscription
Start the subscription.
Returns a callable used to stop/cancel the subscription. Received
messages are passed to the callback provided to set_update_callback.
cache_policy: google_nest_sdm.event_media.CachePolicy
168 @property 169 def cache_policy(self) -> CachePolicy: 170 """Return cache policy shared by device EventMediaManager objects.""" 171 return self._cache_policy
Return cache policy shared by device EventMediaManager objects.
173 async def async_get_device_manager(self) -> DeviceManager: 174 """Return the DeviceManger with the current state of devices.""" 175 if not self._device_manager_task: 176 self._device_manager_task = asyncio.create_task( 177 self._async_create_device_manager() 178 ) 179 device_manager = await self._device_manager_task 180 if not self._refresh_task: 181 self._refresh_task = asyncio.create_task( 182 self._async_run_refresh(device_manager) 183 ) 184 return device_manager
Return the DeviceManger with the current state of devices.
class
ApiEnv(enum.Enum):
56class ApiEnv(enum.Enum): 57 PROD = (OAUTH2_AUTHORIZE_FORMAT, API_URL) 58 PREPROD = ( 59 "https://sdmresourcepicker-preprod.sandbox.google.com/partnerconnections/{project_id}/auth", 60 "https://preprod-smartdevicemanagement.googleapis.com/v1", 61 ) 62 63 def __init__(self, authorize_url: str, api_url: str) -> None: 64 """Init ApiEnv.""" 65 self._authorize_url = authorize_url 66 self._api_url = api_url 67 68 @property 69 def authorize_url_format(self) -> str: 70 """OAuth Authorize url format string.""" 71 return self._authorize_url 72 73 @property 74 def api_url(self) -> str: 75 """API url.""" 76 return self._api_url
ApiEnv(authorize_url: str, api_url: str)
63 def __init__(self, authorize_url: str, api_url: str) -> None: 64 """Init ApiEnv.""" 65 self._authorize_url = authorize_url 66 self._api_url = api_url
Init ApiEnv.
PROD =
<ApiEnv.PROD: ('https://nestservices.google.com/partnerconnections/{project_id}/auth', 'https://smartdevicemanagement.googleapis.com/v1')>
PREPROD =
<ApiEnv.PREPROD: ('https://sdmresourcepicker-preprod.sandbox.google.com/partnerconnections/{project_id}/auth', 'https://preprod-smartdevicemanagement.googleapis.com/v1')>