google_nest_sdm.device_manager

Device Manager keeps track of the current state of all devices.

  1"""Device Manager keeps track of the current state of all devices."""
  2
  3from __future__ import annotations
  4
  5import logging
  6from typing import Awaitable, Callable, Dict
  7
  8from .device import Device, ParentRelation
  9from .exceptions import ApiException
 10from .event import EventMessage, RelationUpdate
 11from .event_media import CachePolicy
 12from .google_nest_api import GoogleNestAPI
 13from .structure import Structure
 14from .diagnostics import DEVICE_MANAGER_DIAGNOSTICS as DIAGNOSTICS
 15
 16_LOGGER = logging.getLogger(__name__)
 17
 18
 19class DeviceManager:
 20    """DeviceManager holds current state of all devices."""
 21
 22    def __init__(
 23        self, api: GoogleNestAPI, cache_policy: CachePolicy | None = None
 24    ) -> None:
 25        """Initialize DeviceManager."""
 26        self._api = api
 27        self._devices: Dict[str, Device] = {}
 28        self._structures: Dict[str, Structure] = {}
 29        self._cache_policy = cache_policy if cache_policy else CachePolicy()
 30        self._update_callback: Callable[[EventMessage], Awaitable[None]] | None = None
 31        self._change_callback: Callable[[], Awaitable[None]] | None = None
 32
 33    @property
 34    def devices(self) -> Dict[str, Device]:
 35        """Return current state of devices."""
 36        return self._devices
 37
 38    @property
 39    def structures(self) -> Dict[str, Structure]:
 40        """Return current state of structures."""
 41        return self._structures
 42
 43    @property
 44    def cache_policy(self) -> CachePolicy:
 45        """Return cache policy shared by device EventMediaManager objects."""
 46        return self._cache_policy
 47
 48    def set_update_callback(
 49        self, target: Callable[[EventMessage], Awaitable[None]]
 50    ) -> None:
 51        """Register a callback invoked when new messages are received.
 52
 53        If the event is associated with media, then the callback will only
 54        be invoked once the media has been fetched.
 55        """
 56        self._update_callback = target
 57        for device in self._devices.values():
 58            device.event_media_manager.set_update_callback(target)
 59
 60    async def async_handle_event(self, event_message: EventMessage) -> None:
 61        """Handle a new message received."""
 62
 63        if _is_invalid_thermostat_trait_update(event_message):
 64            _LOGGER.debug(
 65                "Ignoring event with invalid update traits; Refreshing devices: %s",
 66                event_message.resource_update_traits,
 67            )
 68            await self._hack_refresh_devices()
 69            return
 70
 71        if event_message.relation_update:
 72            _LOGGER.debug("Handling relation update: %s", event_message.relation_update)
 73            self._handle_device_relation(event_message.relation_update)
 74            # Also discover any new devices/structures
 75            try:
 76                await self.async_refresh()
 77            except ApiException:
 78                _LOGGER.debug("Failed to refresh devices")
 79            if self._update_callback:
 80                await self._update_callback(event_message)
 81            return
 82
 83        if event_message.resource_update_name:
 84            device_id = event_message.resource_update_name
 85            if device_id in self._devices:
 86                device = self._devices[device_id]
 87                await device.async_handle_event(event_message)
 88
 89    def _structure_name(self, relation_subject: str) -> str:
 90        if relation_subject in self._structures:
 91            structure = self._structures[relation_subject]
 92            for trait in [structure.info, structure.room_info]:
 93                if trait and trait.custom_name:
 94                    return trait.custom_name
 95        return "Unknown"
 96
 97    def _handle_device_relation(self, relation: RelationUpdate) -> None:
 98        if relation.object not in self._devices:
 99            return
100
101        device = self._devices[relation.object]
102        if relation.type == "DELETED":
103            # Delete device from room/structure
104            device.delete_relation(relation.subject)
105
106        if relation.type == "UPDATED" or relation.type == "CREATED":
107            # Device moved to a room
108            assert relation.subject
109            device.create_relation(
110                ParentRelation.from_dict(
111                    {
112                        "parent": relation.subject,
113                        "displayName": self._structure_name(relation.subject),
114                    }
115                )
116            )
117
118    async def async_refresh(self) -> None:
119        """Refresh devices and structures from the API."""
120        # Refresh structures
121        structures = await self._api.async_get_structures()
122        old_structure_ids = set(self._structures.keys())
123        new_structures = {
124            structure.name: structure for structure in structures if structure.name
125        }
126        for structure in new_structures.values():
127            if structure.name not in self._structures:
128                _LOGGER.debug("Adding structure %s", structure.name)
129                self._structures[structure.name] = structure
130        removed_structure_ids = old_structure_ids - set(new_structures.keys())
131        for structure_id in removed_structure_ids:
132            _LOGGER.debug("Removing structure %s", structure_id)
133            del self._structures[structure_id]
134
135        # Refresh devices
136        devices = await self._api.async_get_devices()
137        old_device_ids = set(self._devices.keys())
138        new_devices = {device.name: device for device in devices if device.name}
139        for device in new_devices.values():
140            if existing_device := self._devices.get(device.name):
141                existing_device.merge_from_update(device)
142            else:
143                _LOGGER.debug("Adding device %s", device.name)
144                self._add_device(device)
145
146        removed_device_ids = old_device_ids - set(new_devices.keys())
147        for device_id in removed_device_ids:
148            _LOGGER.debug("Removing device %s", device_id)
149            del self._devices[device_id]
150
151        if self._change_callback and (
152            old_device_ids != set(self._devices.keys())
153            or old_structure_ids != set(self._structures.keys())
154        ):
155            await self._change_callback()
156
157    def _add_device(self, device: Device) -> None:
158        """Track the specified device."""
159        assert device.name
160        self._devices[device.name] = device
161        # Share a single cache policy across all devices
162        device.event_media_manager.cache_policy = self._cache_policy
163        if self._update_callback:
164            device.event_media_manager.set_update_callback(self._update_callback)
165
166    def set_change_callback(self, target: Callable[[], Awaitable[None]]) -> None:
167        """Register a callback invoked when devices or structures change."""
168        self._change_callback = target
169
170    async def _hack_refresh_devices(self) -> None:
171        """Update the device manager with refreshed devices from the API."""
172        DIAGNOSTICS.increment("invalid-thermostat-update")
173        try:
174            await self.async_refresh()
175        except ApiException:
176            DIAGNOSTICS.increment("invalid-thermostat-update-refresh-failure")
177            _LOGGER.debug("Failed to refresh devices after invalid message")
178        else:
179            DIAGNOSTICS.increment("invalid-thermostat-update-refresh-success")
180
181
182def _is_invalid_thermostat_trait_update(event: EventMessage) -> bool:
183    """Return true if this is an invalid thermostat trait update."""
184    if (
185        event.resource_update_traits is not None
186        and (
187            thermostat_mode := event.resource_update_traits.get(
188                "sdm.devices.traits.ThermostatMode"
189            )
190        )
191        and (available_modes := thermostat_mode.get("availableModes")) is not None
192        and available_modes == ["OFF"]
193    ):
194        return True
195    return False
class DeviceManager:
 20class DeviceManager:
 21    """DeviceManager holds current state of all devices."""
 22
 23    def __init__(
 24        self, api: GoogleNestAPI, cache_policy: CachePolicy | None = None
 25    ) -> None:
 26        """Initialize DeviceManager."""
 27        self._api = api
 28        self._devices: Dict[str, Device] = {}
 29        self._structures: Dict[str, Structure] = {}
 30        self._cache_policy = cache_policy if cache_policy else CachePolicy()
 31        self._update_callback: Callable[[EventMessage], Awaitable[None]] | None = None
 32        self._change_callback: Callable[[], Awaitable[None]] | None = None
 33
 34    @property
 35    def devices(self) -> Dict[str, Device]:
 36        """Return current state of devices."""
 37        return self._devices
 38
 39    @property
 40    def structures(self) -> Dict[str, Structure]:
 41        """Return current state of structures."""
 42        return self._structures
 43
 44    @property
 45    def cache_policy(self) -> CachePolicy:
 46        """Return cache policy shared by device EventMediaManager objects."""
 47        return self._cache_policy
 48
 49    def set_update_callback(
 50        self, target: Callable[[EventMessage], Awaitable[None]]
 51    ) -> None:
 52        """Register a callback invoked when new messages are received.
 53
 54        If the event is associated with media, then the callback will only
 55        be invoked once the media has been fetched.
 56        """
 57        self._update_callback = target
 58        for device in self._devices.values():
 59            device.event_media_manager.set_update_callback(target)
 60
 61    async def async_handle_event(self, event_message: EventMessage) -> None:
 62        """Handle a new message received."""
 63
 64        if _is_invalid_thermostat_trait_update(event_message):
 65            _LOGGER.debug(
 66                "Ignoring event with invalid update traits; Refreshing devices: %s",
 67                event_message.resource_update_traits,
 68            )
 69            await self._hack_refresh_devices()
 70            return
 71
 72        if event_message.relation_update:
 73            _LOGGER.debug("Handling relation update: %s", event_message.relation_update)
 74            self._handle_device_relation(event_message.relation_update)
 75            # Also discover any new devices/structures
 76            try:
 77                await self.async_refresh()
 78            except ApiException:
 79                _LOGGER.debug("Failed to refresh devices")
 80            if self._update_callback:
 81                await self._update_callback(event_message)
 82            return
 83
 84        if event_message.resource_update_name:
 85            device_id = event_message.resource_update_name
 86            if device_id in self._devices:
 87                device = self._devices[device_id]
 88                await device.async_handle_event(event_message)
 89
 90    def _structure_name(self, relation_subject: str) -> str:
 91        if relation_subject in self._structures:
 92            structure = self._structures[relation_subject]
 93            for trait in [structure.info, structure.room_info]:
 94                if trait and trait.custom_name:
 95                    return trait.custom_name
 96        return "Unknown"
 97
 98    def _handle_device_relation(self, relation: RelationUpdate) -> None:
 99        if relation.object not in self._devices:
100            return
101
102        device = self._devices[relation.object]
103        if relation.type == "DELETED":
104            # Delete device from room/structure
105            device.delete_relation(relation.subject)
106
107        if relation.type == "UPDATED" or relation.type == "CREATED":
108            # Device moved to a room
109            assert relation.subject
110            device.create_relation(
111                ParentRelation.from_dict(
112                    {
113                        "parent": relation.subject,
114                        "displayName": self._structure_name(relation.subject),
115                    }
116                )
117            )
118
119    async def async_refresh(self) -> None:
120        """Refresh devices and structures from the API."""
121        # Refresh structures
122        structures = await self._api.async_get_structures()
123        old_structure_ids = set(self._structures.keys())
124        new_structures = {
125            structure.name: structure for structure in structures if structure.name
126        }
127        for structure in new_structures.values():
128            if structure.name not in self._structures:
129                _LOGGER.debug("Adding structure %s", structure.name)
130                self._structures[structure.name] = structure
131        removed_structure_ids = old_structure_ids - set(new_structures.keys())
132        for structure_id in removed_structure_ids:
133            _LOGGER.debug("Removing structure %s", structure_id)
134            del self._structures[structure_id]
135
136        # Refresh devices
137        devices = await self._api.async_get_devices()
138        old_device_ids = set(self._devices.keys())
139        new_devices = {device.name: device for device in devices if device.name}
140        for device in new_devices.values():
141            if existing_device := self._devices.get(device.name):
142                existing_device.merge_from_update(device)
143            else:
144                _LOGGER.debug("Adding device %s", device.name)
145                self._add_device(device)
146
147        removed_device_ids = old_device_ids - set(new_devices.keys())
148        for device_id in removed_device_ids:
149            _LOGGER.debug("Removing device %s", device_id)
150            del self._devices[device_id]
151
152        if self._change_callback and (
153            old_device_ids != set(self._devices.keys())
154            or old_structure_ids != set(self._structures.keys())
155        ):
156            await self._change_callback()
157
158    def _add_device(self, device: Device) -> None:
159        """Track the specified device."""
160        assert device.name
161        self._devices[device.name] = device
162        # Share a single cache policy across all devices
163        device.event_media_manager.cache_policy = self._cache_policy
164        if self._update_callback:
165            device.event_media_manager.set_update_callback(self._update_callback)
166
167    def set_change_callback(self, target: Callable[[], Awaitable[None]]) -> None:
168        """Register a callback invoked when devices or structures change."""
169        self._change_callback = target
170
171    async def _hack_refresh_devices(self) -> None:
172        """Update the device manager with refreshed devices from the API."""
173        DIAGNOSTICS.increment("invalid-thermostat-update")
174        try:
175            await self.async_refresh()
176        except ApiException:
177            DIAGNOSTICS.increment("invalid-thermostat-update-refresh-failure")
178            _LOGGER.debug("Failed to refresh devices after invalid message")
179        else:
180            DIAGNOSTICS.increment("invalid-thermostat-update-refresh-success")

DeviceManager holds current state of all devices.

DeviceManager( api: google_nest_sdm.google_nest_api.GoogleNestAPI, cache_policy: google_nest_sdm.event_media.CachePolicy | None = None)
23    def __init__(
24        self, api: GoogleNestAPI, cache_policy: CachePolicy | None = None
25    ) -> None:
26        """Initialize DeviceManager."""
27        self._api = api
28        self._devices: Dict[str, Device] = {}
29        self._structures: Dict[str, Structure] = {}
30        self._cache_policy = cache_policy if cache_policy else CachePolicy()
31        self._update_callback: Callable[[EventMessage], Awaitable[None]] | None = None
32        self._change_callback: Callable[[], Awaitable[None]] | None = None

Initialize DeviceManager.

devices: Dict[str, google_nest_sdm.device.Device]
34    @property
35    def devices(self) -> Dict[str, Device]:
36        """Return current state of devices."""
37        return self._devices

Return current state of devices.

structures: Dict[str, google_nest_sdm.structure.Structure]
39    @property
40    def structures(self) -> Dict[str, Structure]:
41        """Return current state of structures."""
42        return self._structures

Return current state of structures.

cache_policy: google_nest_sdm.event_media.CachePolicy
44    @property
45    def cache_policy(self) -> CachePolicy:
46        """Return cache policy shared by device EventMediaManager objects."""
47        return self._cache_policy

Return cache policy shared by device EventMediaManager objects.

def set_update_callback( self, target: Callable[[google_nest_sdm.event.EventMessage], Awaitable[NoneType]]) -> None:
49    def set_update_callback(
50        self, target: Callable[[EventMessage], Awaitable[None]]
51    ) -> None:
52        """Register a callback invoked when new messages are received.
53
54        If the event is associated with media, then the callback will only
55        be invoked once the media has been fetched.
56        """
57        self._update_callback = target
58        for device in self._devices.values():
59            device.event_media_manager.set_update_callback(target)

Register a callback invoked when new messages are received.

If the event is associated with media, then the callback will only be invoked once the media has been fetched.

async def async_handle_event(self, event_message: google_nest_sdm.event.EventMessage) -> None:
61    async def async_handle_event(self, event_message: EventMessage) -> None:
62        """Handle a new message received."""
63
64        if _is_invalid_thermostat_trait_update(event_message):
65            _LOGGER.debug(
66                "Ignoring event with invalid update traits; Refreshing devices: %s",
67                event_message.resource_update_traits,
68            )
69            await self._hack_refresh_devices()
70            return
71
72        if event_message.relation_update:
73            _LOGGER.debug("Handling relation update: %s", event_message.relation_update)
74            self._handle_device_relation(event_message.relation_update)
75            # Also discover any new devices/structures
76            try:
77                await self.async_refresh()
78            except ApiException:
79                _LOGGER.debug("Failed to refresh devices")
80            if self._update_callback:
81                await self._update_callback(event_message)
82            return
83
84        if event_message.resource_update_name:
85            device_id = event_message.resource_update_name
86            if device_id in self._devices:
87                device = self._devices[device_id]
88                await device.async_handle_event(event_message)

Handle a new message received.

async def async_refresh(self) -> None:
119    async def async_refresh(self) -> None:
120        """Refresh devices and structures from the API."""
121        # Refresh structures
122        structures = await self._api.async_get_structures()
123        old_structure_ids = set(self._structures.keys())
124        new_structures = {
125            structure.name: structure for structure in structures if structure.name
126        }
127        for structure in new_structures.values():
128            if structure.name not in self._structures:
129                _LOGGER.debug("Adding structure %s", structure.name)
130                self._structures[structure.name] = structure
131        removed_structure_ids = old_structure_ids - set(new_structures.keys())
132        for structure_id in removed_structure_ids:
133            _LOGGER.debug("Removing structure %s", structure_id)
134            del self._structures[structure_id]
135
136        # Refresh devices
137        devices = await self._api.async_get_devices()
138        old_device_ids = set(self._devices.keys())
139        new_devices = {device.name: device for device in devices if device.name}
140        for device in new_devices.values():
141            if existing_device := self._devices.get(device.name):
142                existing_device.merge_from_update(device)
143            else:
144                _LOGGER.debug("Adding device %s", device.name)
145                self._add_device(device)
146
147        removed_device_ids = old_device_ids - set(new_devices.keys())
148        for device_id in removed_device_ids:
149            _LOGGER.debug("Removing device %s", device_id)
150            del self._devices[device_id]
151
152        if self._change_callback and (
153            old_device_ids != set(self._devices.keys())
154            or old_structure_ids != set(self._structures.keys())
155        ):
156            await self._change_callback()

Refresh devices and structures from the API.

def set_change_callback(self, target: Callable[[], Awaitable[NoneType]]) -> None:
167    def set_change_callback(self, target: Callable[[], Awaitable[None]]) -> None:
168        """Register a callback invoked when devices or structures change."""
169        self._change_callback = target

Register a callback invoked when devices or structures change.