google_nest_sdm.device_manager
Device Manager keeps track of the current state of all devices.
1"""Device Manager keeps track of the current state of all devices.""" 2 3from __future__ import annotations 4 5import logging 6from typing import Awaitable, Callable, Dict 7 8from .device import Device, ParentRelation 9from .exceptions import ApiException 10from .event import EventMessage, RelationUpdate 11from .event_media import CachePolicy 12from .google_nest_api import GoogleNestAPI 13from .structure import Structure 14from .diagnostics import DEVICE_MANAGER_DIAGNOSTICS as DIAGNOSTICS 15 16_LOGGER = logging.getLogger(__name__) 17 18 19class DeviceManager: 20 """DeviceManager holds current state of all devices.""" 21 22 def __init__( 23 self, api: GoogleNestAPI, cache_policy: CachePolicy | None = None 24 ) -> None: 25 """Initialize DeviceManager.""" 26 self._api = api 27 self._devices: Dict[str, Device] = {} 28 self._structures: Dict[str, Structure] = {} 29 self._cache_policy = cache_policy if cache_policy else CachePolicy() 30 self._update_callback: Callable[[EventMessage], Awaitable[None]] | None = None 31 self._change_callback: Callable[[], Awaitable[None]] | None = None 32 33 @property 34 def devices(self) -> Dict[str, Device]: 35 """Return current state of devices.""" 36 return self._devices 37 38 @property 39 def structures(self) -> Dict[str, Structure]: 40 """Return current state of structures.""" 41 return self._structures 42 43 @property 44 def cache_policy(self) -> CachePolicy: 45 """Return cache policy shared by device EventMediaManager objects.""" 46 return self._cache_policy 47 48 def set_update_callback( 49 self, target: Callable[[EventMessage], Awaitable[None]] 50 ) -> None: 51 """Register a callback invoked when new messages are received. 52 53 If the event is associated with media, then the callback will only 54 be invoked once the media has been fetched. 55 """ 56 self._update_callback = target 57 for device in self._devices.values(): 58 device.event_media_manager.set_update_callback(target) 59 60 async def async_handle_event(self, event_message: EventMessage) -> None: 61 """Handle a new message received.""" 62 63 if _is_invalid_thermostat_trait_update(event_message): 64 _LOGGER.debug( 65 "Ignoring event with invalid update traits; Refreshing devices: %s", 66 event_message.resource_update_traits, 67 ) 68 await self._hack_refresh_devices() 69 return 70 71 if event_message.relation_update: 72 _LOGGER.debug("Handling relation update: %s", event_message.relation_update) 73 self._handle_device_relation(event_message.relation_update) 74 # Also discover any new devices/structures 75 try: 76 await self.async_refresh() 77 except ApiException: 78 _LOGGER.debug("Failed to refresh devices") 79 if self._update_callback: 80 await self._update_callback(event_message) 81 return 82 83 if event_message.resource_update_name: 84 device_id = event_message.resource_update_name 85 if device_id in self._devices: 86 device = self._devices[device_id] 87 await device.async_handle_event(event_message) 88 89 def _structure_name(self, relation_subject: str) -> str: 90 if relation_subject in self._structures: 91 structure = self._structures[relation_subject] 92 for trait in [structure.info, structure.room_info]: 93 if trait and trait.custom_name: 94 return trait.custom_name 95 return "Unknown" 96 97 def _handle_device_relation(self, relation: RelationUpdate) -> None: 98 if relation.object not in self._devices: 99 return 100 101 device = self._devices[relation.object] 102 if relation.type == "DELETED": 103 # Delete device from room/structure 104 device.delete_relation(relation.subject) 105 106 if relation.type == "UPDATED" or relation.type == "CREATED": 107 # Device moved to a room 108 assert relation.subject 109 device.create_relation( 110 ParentRelation.from_dict( 111 { 112 "parent": relation.subject, 113 "displayName": self._structure_name(relation.subject), 114 } 115 ) 116 ) 117 118 async def async_refresh(self) -> None: 119 """Refresh devices and structures from the API.""" 120 # Refresh structures 121 structures = await self._api.async_get_structures() 122 old_structure_ids = set(self._structures.keys()) 123 new_structures = { 124 structure.name: structure for structure in structures if structure.name 125 } 126 for structure in new_structures.values(): 127 if structure.name not in self._structures: 128 _LOGGER.debug("Adding structure %s", structure.name) 129 self._structures[structure.name] = structure 130 removed_structure_ids = old_structure_ids - set(new_structures.keys()) 131 for structure_id in removed_structure_ids: 132 _LOGGER.debug("Removing structure %s", structure_id) 133 del self._structures[structure_id] 134 135 # Refresh devices 136 devices = await self._api.async_get_devices() 137 old_device_ids = set(self._devices.keys()) 138 new_devices = {device.name: device for device in devices if device.name} 139 for device in new_devices.values(): 140 if existing_device := self._devices.get(device.name): 141 existing_device.merge_from_update(device) 142 else: 143 _LOGGER.debug("Adding device %s", device.name) 144 self._add_device(device) 145 146 removed_device_ids = old_device_ids - set(new_devices.keys()) 147 for device_id in removed_device_ids: 148 _LOGGER.debug("Removing device %s", device_id) 149 del self._devices[device_id] 150 151 if self._change_callback and ( 152 old_device_ids != set(self._devices.keys()) 153 or old_structure_ids != set(self._structures.keys()) 154 ): 155 await self._change_callback() 156 157 def _add_device(self, device: Device) -> None: 158 """Track the specified device.""" 159 assert device.name 160 self._devices[device.name] = device 161 # Share a single cache policy across all devices 162 device.event_media_manager.cache_policy = self._cache_policy 163 if self._update_callback: 164 device.event_media_manager.set_update_callback(self._update_callback) 165 166 def set_change_callback(self, target: Callable[[], Awaitable[None]]) -> None: 167 """Register a callback invoked when devices or structures change.""" 168 self._change_callback = target 169 170 async def _hack_refresh_devices(self) -> None: 171 """Update the device manager with refreshed devices from the API.""" 172 DIAGNOSTICS.increment("invalid-thermostat-update") 173 try: 174 await self.async_refresh() 175 except ApiException: 176 DIAGNOSTICS.increment("invalid-thermostat-update-refresh-failure") 177 _LOGGER.debug("Failed to refresh devices after invalid message") 178 else: 179 DIAGNOSTICS.increment("invalid-thermostat-update-refresh-success") 180 181 182def _is_invalid_thermostat_trait_update(event: EventMessage) -> bool: 183 """Return true if this is an invalid thermostat trait update.""" 184 if ( 185 event.resource_update_traits is not None 186 and ( 187 thermostat_mode := event.resource_update_traits.get( 188 "sdm.devices.traits.ThermostatMode" 189 ) 190 ) 191 and (available_modes := thermostat_mode.get("availableModes")) is not None 192 and available_modes == ["OFF"] 193 ): 194 return True 195 return False
class
DeviceManager:
20class DeviceManager: 21 """DeviceManager holds current state of all devices.""" 22 23 def __init__( 24 self, api: GoogleNestAPI, cache_policy: CachePolicy | None = None 25 ) -> None: 26 """Initialize DeviceManager.""" 27 self._api = api 28 self._devices: Dict[str, Device] = {} 29 self._structures: Dict[str, Structure] = {} 30 self._cache_policy = cache_policy if cache_policy else CachePolicy() 31 self._update_callback: Callable[[EventMessage], Awaitable[None]] | None = None 32 self._change_callback: Callable[[], Awaitable[None]] | None = None 33 34 @property 35 def devices(self) -> Dict[str, Device]: 36 """Return current state of devices.""" 37 return self._devices 38 39 @property 40 def structures(self) -> Dict[str, Structure]: 41 """Return current state of structures.""" 42 return self._structures 43 44 @property 45 def cache_policy(self) -> CachePolicy: 46 """Return cache policy shared by device EventMediaManager objects.""" 47 return self._cache_policy 48 49 def set_update_callback( 50 self, target: Callable[[EventMessage], Awaitable[None]] 51 ) -> None: 52 """Register a callback invoked when new messages are received. 53 54 If the event is associated with media, then the callback will only 55 be invoked once the media has been fetched. 56 """ 57 self._update_callback = target 58 for device in self._devices.values(): 59 device.event_media_manager.set_update_callback(target) 60 61 async def async_handle_event(self, event_message: EventMessage) -> None: 62 """Handle a new message received.""" 63 64 if _is_invalid_thermostat_trait_update(event_message): 65 _LOGGER.debug( 66 "Ignoring event with invalid update traits; Refreshing devices: %s", 67 event_message.resource_update_traits, 68 ) 69 await self._hack_refresh_devices() 70 return 71 72 if event_message.relation_update: 73 _LOGGER.debug("Handling relation update: %s", event_message.relation_update) 74 self._handle_device_relation(event_message.relation_update) 75 # Also discover any new devices/structures 76 try: 77 await self.async_refresh() 78 except ApiException: 79 _LOGGER.debug("Failed to refresh devices") 80 if self._update_callback: 81 await self._update_callback(event_message) 82 return 83 84 if event_message.resource_update_name: 85 device_id = event_message.resource_update_name 86 if device_id in self._devices: 87 device = self._devices[device_id] 88 await device.async_handle_event(event_message) 89 90 def _structure_name(self, relation_subject: str) -> str: 91 if relation_subject in self._structures: 92 structure = self._structures[relation_subject] 93 for trait in [structure.info, structure.room_info]: 94 if trait and trait.custom_name: 95 return trait.custom_name 96 return "Unknown" 97 98 def _handle_device_relation(self, relation: RelationUpdate) -> None: 99 if relation.object not in self._devices: 100 return 101 102 device = self._devices[relation.object] 103 if relation.type == "DELETED": 104 # Delete device from room/structure 105 device.delete_relation(relation.subject) 106 107 if relation.type == "UPDATED" or relation.type == "CREATED": 108 # Device moved to a room 109 assert relation.subject 110 device.create_relation( 111 ParentRelation.from_dict( 112 { 113 "parent": relation.subject, 114 "displayName": self._structure_name(relation.subject), 115 } 116 ) 117 ) 118 119 async def async_refresh(self) -> None: 120 """Refresh devices and structures from the API.""" 121 # Refresh structures 122 structures = await self._api.async_get_structures() 123 old_structure_ids = set(self._structures.keys()) 124 new_structures = { 125 structure.name: structure for structure in structures if structure.name 126 } 127 for structure in new_structures.values(): 128 if structure.name not in self._structures: 129 _LOGGER.debug("Adding structure %s", structure.name) 130 self._structures[structure.name] = structure 131 removed_structure_ids = old_structure_ids - set(new_structures.keys()) 132 for structure_id in removed_structure_ids: 133 _LOGGER.debug("Removing structure %s", structure_id) 134 del self._structures[structure_id] 135 136 # Refresh devices 137 devices = await self._api.async_get_devices() 138 old_device_ids = set(self._devices.keys()) 139 new_devices = {device.name: device for device in devices if device.name} 140 for device in new_devices.values(): 141 if existing_device := self._devices.get(device.name): 142 existing_device.merge_from_update(device) 143 else: 144 _LOGGER.debug("Adding device %s", device.name) 145 self._add_device(device) 146 147 removed_device_ids = old_device_ids - set(new_devices.keys()) 148 for device_id in removed_device_ids: 149 _LOGGER.debug("Removing device %s", device_id) 150 del self._devices[device_id] 151 152 if self._change_callback and ( 153 old_device_ids != set(self._devices.keys()) 154 or old_structure_ids != set(self._structures.keys()) 155 ): 156 await self._change_callback() 157 158 def _add_device(self, device: Device) -> None: 159 """Track the specified device.""" 160 assert device.name 161 self._devices[device.name] = device 162 # Share a single cache policy across all devices 163 device.event_media_manager.cache_policy = self._cache_policy 164 if self._update_callback: 165 device.event_media_manager.set_update_callback(self._update_callback) 166 167 def set_change_callback(self, target: Callable[[], Awaitable[None]]) -> None: 168 """Register a callback invoked when devices or structures change.""" 169 self._change_callback = target 170 171 async def _hack_refresh_devices(self) -> None: 172 """Update the device manager with refreshed devices from the API.""" 173 DIAGNOSTICS.increment("invalid-thermostat-update") 174 try: 175 await self.async_refresh() 176 except ApiException: 177 DIAGNOSTICS.increment("invalid-thermostat-update-refresh-failure") 178 _LOGGER.debug("Failed to refresh devices after invalid message") 179 else: 180 DIAGNOSTICS.increment("invalid-thermostat-update-refresh-success")
DeviceManager holds current state of all devices.
DeviceManager( api: google_nest_sdm.google_nest_api.GoogleNestAPI, cache_policy: google_nest_sdm.event_media.CachePolicy | None = None)
23 def __init__( 24 self, api: GoogleNestAPI, cache_policy: CachePolicy | None = None 25 ) -> None: 26 """Initialize DeviceManager.""" 27 self._api = api 28 self._devices: Dict[str, Device] = {} 29 self._structures: Dict[str, Structure] = {} 30 self._cache_policy = cache_policy if cache_policy else CachePolicy() 31 self._update_callback: Callable[[EventMessage], Awaitable[None]] | None = None 32 self._change_callback: Callable[[], Awaitable[None]] | None = None
Initialize DeviceManager.
devices: Dict[str, google_nest_sdm.device.Device]
34 @property 35 def devices(self) -> Dict[str, Device]: 36 """Return current state of devices.""" 37 return self._devices
Return current state of devices.
structures: Dict[str, google_nest_sdm.structure.Structure]
39 @property 40 def structures(self) -> Dict[str, Structure]: 41 """Return current state of structures.""" 42 return self._structures
Return current state of structures.
cache_policy: google_nest_sdm.event_media.CachePolicy
44 @property 45 def cache_policy(self) -> CachePolicy: 46 """Return cache policy shared by device EventMediaManager objects.""" 47 return self._cache_policy
Return cache policy shared by device EventMediaManager objects.
def
set_update_callback( self, target: Callable[[google_nest_sdm.event.EventMessage], Awaitable[NoneType]]) -> None:
49 def set_update_callback( 50 self, target: Callable[[EventMessage], Awaitable[None]] 51 ) -> None: 52 """Register a callback invoked when new messages are received. 53 54 If the event is associated with media, then the callback will only 55 be invoked once the media has been fetched. 56 """ 57 self._update_callback = target 58 for device in self._devices.values(): 59 device.event_media_manager.set_update_callback(target)
Register a callback invoked when new messages are received.
If the event is associated with media, then the callback will only be invoked once the media has been fetched.
61 async def async_handle_event(self, event_message: EventMessage) -> None: 62 """Handle a new message received.""" 63 64 if _is_invalid_thermostat_trait_update(event_message): 65 _LOGGER.debug( 66 "Ignoring event with invalid update traits; Refreshing devices: %s", 67 event_message.resource_update_traits, 68 ) 69 await self._hack_refresh_devices() 70 return 71 72 if event_message.relation_update: 73 _LOGGER.debug("Handling relation update: %s", event_message.relation_update) 74 self._handle_device_relation(event_message.relation_update) 75 # Also discover any new devices/structures 76 try: 77 await self.async_refresh() 78 except ApiException: 79 _LOGGER.debug("Failed to refresh devices") 80 if self._update_callback: 81 await self._update_callback(event_message) 82 return 83 84 if event_message.resource_update_name: 85 device_id = event_message.resource_update_name 86 if device_id in self._devices: 87 device = self._devices[device_id] 88 await device.async_handle_event(event_message)
Handle a new message received.
async def
async_refresh(self) -> None:
119 async def async_refresh(self) -> None: 120 """Refresh devices and structures from the API.""" 121 # Refresh structures 122 structures = await self._api.async_get_structures() 123 old_structure_ids = set(self._structures.keys()) 124 new_structures = { 125 structure.name: structure for structure in structures if structure.name 126 } 127 for structure in new_structures.values(): 128 if structure.name not in self._structures: 129 _LOGGER.debug("Adding structure %s", structure.name) 130 self._structures[structure.name] = structure 131 removed_structure_ids = old_structure_ids - set(new_structures.keys()) 132 for structure_id in removed_structure_ids: 133 _LOGGER.debug("Removing structure %s", structure_id) 134 del self._structures[structure_id] 135 136 # Refresh devices 137 devices = await self._api.async_get_devices() 138 old_device_ids = set(self._devices.keys()) 139 new_devices = {device.name: device for device in devices if device.name} 140 for device in new_devices.values(): 141 if existing_device := self._devices.get(device.name): 142 existing_device.merge_from_update(device) 143 else: 144 _LOGGER.debug("Adding device %s", device.name) 145 self._add_device(device) 146 147 removed_device_ids = old_device_ids - set(new_devices.keys()) 148 for device_id in removed_device_ids: 149 _LOGGER.debug("Removing device %s", device_id) 150 del self._devices[device_id] 151 152 if self._change_callback and ( 153 old_device_ids != set(self._devices.keys()) 154 or old_structure_ids != set(self._structures.keys()) 155 ): 156 await self._change_callback()
Refresh devices and structures from the API.
def
set_change_callback(self, target: Callable[[], Awaitable[NoneType]]) -> None:
167 def set_change_callback(self, target: Callable[[], Awaitable[None]]) -> None: 168 """Register a callback invoked when devices or structures change.""" 169 self._change_callback = target
Register a callback invoked when devices or structures change.