google_nest_sdm.event_media

Libraries related to providing a device level interface for event related media.

An EventMediaManager is associated with a single device and manages the state for events and the lifecycle of media for those events. The manager is invoked by the subscriber when new events arrive and it handles any fetching related to the media, as well as transcoding video clips of needed. The CachePolicy settings determine lifecycle options such as how many events to keep around in the underlying store.

  1"""Libraries related to providing a device level interface for event related media.
  2
  3An `EventMediaManager` is associated with a single device and manages the
  4state for events and the lifecycle of media for those events. The manager is
  5invoked by the subscriber when new events arrive and it handles any fetching
  6related to the media, as well as transcoding video clips of needed. The
  7`CachePolicy` settings determine lifecycle options such as how many events
  8to keep around in the underlying store.
  9"""
 10
 11from __future__ import annotations
 12
 13import asyncio
 14import datetime
 15import itertools
 16import logging
 17import time
 18from abc import ABC, abstractmethod
 19from collections import OrderedDict
 20from collections.abc import Iterable
 21from dataclasses import dataclass, field
 22from typing import Any, Awaitable, Callable, cast
 23
 24from mashumaro import DataClassDictMixin
 25from mashumaro.types import SerializationStrategy
 26from mashumaro.config import BaseConfig
 27
 28from .camera_traits import (
 29    CameraClipPreviewTrait,
 30    CameraEventImageTrait,
 31    EventImageContentType,
 32)
 33from .diagnostics import EVENT_MEDIA_DIAGNOSTICS as DIAGNOSTICS
 34from .diagnostics import Diagnostics
 35from .event import (
 36    CameraClipPreviewEvent,
 37    EventImageType,
 38    EventMessage,
 39    EventToken,
 40    ImageEventBase,
 41    session_event_image_type,
 42    CameraPersonEvent,
 43    CameraMotionEvent,
 44    CameraSoundEvent,
 45    DoorbellChimeEvent,
 46)
 47from .exceptions import GoogleNestException, TranscodeException
 48from .transcoder import Transcoder
 49
 50__all__ = [
 51    "EventMediaManager",
 52    "ImageSession",
 53    "ClipPreviewSession",
 54    "Media",
 55    "EventMediaStore",
 56    "CachePolicy",
 57]
 58
 59_LOGGER = logging.getLogger(__name__)
 60
 61# Should be large enough for processing, but not too large to be a size issue
 62SNAPSHOT_WIDTH_PX = 1600
 63
 64DEFAULT_CACHE_SIZE = 2
 65
 66# Events are collapsed in the order shown here
 67VISIBLE_EVENTS = [
 68    DoorbellChimeEvent.NAME,
 69    CameraPersonEvent.NAME,
 70    CameraMotionEvent.NAME,
 71    CameraSoundEvent.NAME,
 72]
 73
 74# Percentage of items to delete when bulk purging from the cache
 75EXPIRE_CACHE_BATCH_SIZE = 0.05
 76
 77
 78@dataclass
 79class Media:
 80    """Represents media related to an event."""
 81
 82    contents: bytes
 83    """Media content."""
 84
 85    event_image_type: EventImageContentType
 86    """Content event image type of the media."""
 87
 88    @property
 89    def content_type(self) -> str:
 90        """Content type of the media."""
 91        return self.event_image_type.content_type
 92
 93
 94@dataclass
 95class ImageSession:
 96    """An object that holds an image based event."""
 97
 98    event_token: str
 99    """A token that can be used to fetch the media for the event."""
100
101    timestamp: datetime.datetime
102    """Timestamp when the event happened."""
103
104    event_type: str
105    """A label for the type of event."""
106
107
108@dataclass
109class ClipPreviewSession:
110    """An object that holds a clip based event."""
111
112    event_token: str
113    """A token that can be used to fetch the media for the event."""
114
115    timestamp: datetime.datetime
116    """Timestamp when the event happened."""
117
118    event_types: list[str]
119    """A label for the type of event."""
120
121
122class EventMediaStore(ABC):
123    """Interface for external storage."""
124
125    @abstractmethod
126    async def async_load(self) -> dict | None:
127        """Load data."""
128
129    @abstractmethod
130    async def async_save(self, data: dict) -> None:
131        """Save data."""
132
133    @abstractmethod
134    def get_media_key(self, device_id: str, event: ImageEventBase) -> str:
135        """Return the filename to use for the device and event."""
136
137    def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str:
138        """Return the filename for image media."""
139        return self.get_media_key(device_id, event)
140
141    def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str:
142        """Return the filename for clip preview media."""
143        return self.get_media_key(device_id, event)
144
145    def get_clip_preview_thumbnail_media_key(
146        self, device_id: str, event: ImageEventBase
147    ) -> str | None:
148        """Return the filename for thumbnail for clip preview media."""
149        return None
150
151    async def async_load_media(self, media_key: str) -> bytes | None:
152        """Load media content."""
153
154    async def async_save_media(self, media_key: str, content: bytes) -> None:
155        """Write media content."""
156
157    async def async_remove_media(self, media_key: str) -> None:
158        """Remove media content."""
159
160
161class InMemoryEventMediaStore(EventMediaStore):
162    """An in memory implementation of EventMediaStore."""
163
164    def __init__(self) -> None:
165        self._data: dict | None = None
166        self._media: dict[str, bytes] = {}
167
168    async def async_load(self) -> dict | None:
169        """Load data."""
170        return self._data
171
172    async def async_save(self, data: dict) -> None:
173        """Save data."""
174        self._data = data
175
176    def get_media_key(self, device_id: str, event: ImageEventBase) -> str:
177        """Return the media key to use for the device and event."""
178        suffix = "jpg" if event.event_image_type == EventImageType.IMAGE else "mp4"
179        return f"{device_id}_{event.timestamp}_{event.event_session_id}.{suffix}"
180
181    def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str:
182        """Return the media key to use for the device and event."""
183        return (
184            f"{device_id}_{event.timestamp}_{event.event_session_id}_"
185            f"{event.event_id}.jpg"
186        )
187
188    def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str:
189        """Return the media key to use for the device and event."""
190        return f"{device_id}_{event.timestamp}_{event.event_session_id}.mp4"
191
192    def get_clip_preview_thumbnail_media_key(
193        self, device_id: str, event: ImageEventBase
194    ) -> str | None:
195        """Return the media key to use for the clip preview thumbnail."""
196        return f"{device_id}_{event.timestamp}_{event.event_session_id}_thumb.jpg"
197
198    async def async_load_media(self, media_key: str) -> bytes | None:
199        """Load media content."""
200        with DIAGNOSTICS.timer("load_media"):
201            return self._media.get(media_key)
202
203    async def async_save_media(self, media_key: str, content: bytes) -> None:
204        """Remove media content."""
205        with DIAGNOSTICS.timer("save_media"):
206            self._media[media_key] = content
207
208    async def async_remove_media(self, media_key: str) -> None:
209        """Remove media content."""
210        with DIAGNOSTICS.timer("remove_media"):
211            if media_key in self._media:
212                del self._media[media_key]
213
214
215@dataclass
216class CachePolicy:
217    """Policy for how many local objects to cache in memory."""
218
219    event_cache_size: int = DEFAULT_CACHE_SIZE
220    """Number of events to keep in memory per device."""
221
222    fetch: bool = False
223    """Determine if event media should be pre-fetched."""
224
225    store: EventMediaStore = field(default_factory=InMemoryEventMediaStore)
226    """The EventMediaStore object for storing media content."""
227
228    transcoder: Transcoder | None = None
229    """The transcoder for encoding media."""
230
231    @property
232    def event_cache_expire_count(self) -> int:
233        """Number of events to keep in memory per device."""
234        return max(int(self.event_cache_size * EXPIRE_CACHE_BATCH_SIZE), 1)
235
236
237class ImageEventSerializationStrategy(SerializationStrategy):
238    """Serialization strategy for ImageEventBase."""
239
240    def serialize(self, value: dict[str, ImageEventBase]) -> dict[str, Any]:
241        """Serialize ImageEventBase."""
242        return dict((k, v.as_dict()) for k, v in value.items())
243
244    def deserialize(self, value: dict[str, Any]) -> dict[str, ImageEventBase]:
245        """Deserialize ImageEventBase."""
246        events: dict[str, ImageEventBase] = {}
247        for event_type, event_data in value.items():
248            # Propagate timestamps to child nodes
249            if (timestamp := event_data.get("timestamp")) and (
250                data := event_data.get("event_data")
251            ):
252                data["timestamp"] = timestamp
253            if event := ImageEventBase.parse_event_dict(event_data):
254                events[event_type] = event
255
256        # Link events to other events in the session
257        event_image_type = session_event_image_type(events.values())
258        for event in events.values():
259            event.event_image_type = event_image_type
260        return events
261
262
263@dataclass
264class EventMediaModelItem(DataClassDictMixin):
265    """Structure used to persist the event in EventMediaStore."""
266
267    event_session_id: str
268    events: dict[str, ImageEventBase] = field(default_factory=dict)
269    media_key: str | None = field(default=None)
270    event_media_keys: dict[str, str] = field(default_factory=dict)
271    thumbnail_media_key: str | None = field(default=None)
272    pending_event_keys: set[str] = field(default_factory=set)
273
274    @property
275    def visible_event(self) -> ImageEventBase | None:
276        """Get the primary visible event for this item."""
277        for event_type in VISIBLE_EVENTS:
278            if event := self.events.get(event_type):
279                return event
280        return None
281
282    def merge_events(self, new_events: dict[str, ImageEventBase]) -> None:
283        """Merge new incoming events with the existing set."""
284        new_keys = new_events.keys() - self.events.keys()
285        self.events.update(new_events)
286        self.pending_event_keys |= new_keys
287
288    @property
289    def pending_events(self) -> dict[str, ImageEventBase]:
290        """Return all associated events with this record."""
291        return {
292            key: value
293            for key, value in self.events.items()
294            if key in self.pending_event_keys
295        }
296
297    def notified(self, event_keys: Iterable[str]) -> None:
298        """Mark the specified events as notified."""
299        self.pending_event_keys = self.pending_event_keys - set(event_keys)
300
301    def media_key_for_token(self, token: EventToken) -> str | None:
302        """Return media key for the specified event token."""
303        if token.event_id:
304            if token.event_id in self.event_media_keys:
305                return self.event_media_keys[token.event_id]
306            # Fallback to legacy single event per session
307        return self.media_key
308
309    @property
310    def any_media_key(self) -> str | None:
311        """Return any media item for compatibility with legacy APIs."""
312        if self.media_key:
313            return self.media_key
314        if self.event_media_keys.values():
315            return next(iter(self.event_media_keys.values()))
316        return None
317
318    @property
319    def all_media_keys(self) -> list[str]:
320        """Return all media items for purging media keys."""
321        keys = [self.media_key, self.thumbnail_media_key] + list(
322            self.event_media_keys.values()
323        )
324        return [key for key in keys if key is not None]
325
326    class Config(BaseConfig):
327        serialization_strategy = {
328            dict[str, ImageEventBase]: ImageEventSerializationStrategy(),
329        }
330
331
332class EventMediaManager:
333    """Responsible for handling recent events and fetching associated media."""
334
335    def __init__(
336        self,
337        device_id: str,
338        traits: dict[str, Any],
339        event_traits: set[str],
340        diagnostics: Diagnostics,
341    ) -> None:
342        """Initialize DeviceEventMediaManager."""
343        self._device_id = device_id
344        self._traits = traits
345        self._event_traits = event_traits
346        self._cache_policy = CachePolicy()
347        self._callback: Callable[[EventMessage], Awaitable[None]] | None = None
348        self._support_fetch = (
349            CameraClipPreviewTrait.NAME in traits
350            or CameraEventImageTrait.NAME in traits
351        )
352        self._diagnostics = diagnostics
353        self._lock: asyncio.Lock | None = None
354
355    @property
356    def cache_policy(self) -> CachePolicy:
357        """Return the current CachePolicy."""
358        return self._cache_policy
359
360    @cache_policy.setter
361    def cache_policy(self, value: CachePolicy) -> None:
362        """Update the CachePolicy."""
363        self._cache_policy = value
364
365    async def _async_load(self) -> OrderedDict[str, EventMediaModelItem]:
366        """Load the device specific data from the store."""
367        store_data = await self._cache_policy.store.async_load()
368        event_data: OrderedDict[str, EventMediaModelItem] = OrderedDict()
369        if store_data:
370            device_data = store_data.get(self._device_id, [])
371            for item_data in device_data:
372                try:
373                    item = EventMediaModelItem.from_dict(item_data)
374                except Exception as err:
375                    _LOGGER.debug("Failed to parse event item: %s", str(err))
376                    raise err
377                event_data[item.event_session_id] = item
378        return event_data
379
380    async def _async_update(self, event_data: dict[str, EventMediaModelItem]) -> None:
381        """Save the device specific model to the store."""
382        # Event order is preserved so popping from the oldest entry works
383        device_data: list[dict[str, Any]] = []
384        for item in event_data.values():
385            device_data.append(item.to_dict())
386
387        # Read data from the store and update information for this device
388        store_data = await self._cache_policy.store.async_load()
389        if not store_data:
390            store_data = {}
391        store_data[self._device_id] = device_data
392        await self._cache_policy.store.async_save(store_data)
393
394    async def _async_load_item(
395        self, event_session_id: str
396    ) -> EventMediaModelItem | None:
397        """Load the specific item from the store."""
398        event_data = await self._async_load()
399        return event_data.get(event_session_id)
400
401    async def _async_update_item(self, item: EventMediaModelItem) -> None:
402        """Update the specific item in the store."""
403        if not self._lock:
404            self._lock = asyncio.Lock()
405        async with self._lock:
406            event_data = await self._async_load()
407            event_data[item.event_session_id] = item
408            await self._async_update(event_data)
409
410    async def _expire_cache(self) -> None:
411        """Garbage collect any items from the cache."""
412        if not self._lock:
413            self._lock = asyncio.Lock()
414        async with self._lock:
415            event_data = await self._async_load()
416            _LOGGER.debug("Checking cache size %s", len(event_data))
417            if len(event_data) <= self._cache_policy.event_cache_size:
418                return
419            _LOGGER.debug(
420                "Expiring cache %s", self._cache_policy.event_cache_expire_count
421            )
422            # Bulk pop items
423            for i in range(0, self._cache_policy.event_cache_expire_count):
424                (key, old_item) = event_data.popitem(last=False)
425                _LOGGER.debug(
426                    "Expiring media %s (%s)",
427                    old_item.all_media_keys,
428                    old_item.event_session_id,
429                )
430                for media_key in old_item.all_media_keys:
431                    await self._cache_policy.store.async_remove_media(media_key)
432            await self._async_update(event_data)
433
434    async def _fetch_media(self, item: EventMediaModelItem) -> None:
435        """Fetch media from the server in response to a pubsub event."""
436        store = self._cache_policy.store
437        if CameraClipPreviewTrait.NAME in self._traits:
438            self._diagnostics.increment("fetch_clip")
439            if (
440                item.media_key
441                or not item.visible_event
442                or not (
443                    clip_event := cast(
444                        CameraClipPreviewEvent,
445                        item.events.get(CameraClipPreviewEvent.NAME),
446                    )
447                )
448                or clip_event.is_expired
449            ):
450                self._diagnostics.increment("fetch_clip.skip")
451                return
452            clip_preview_trait: CameraClipPreviewTrait = self._traits[
453                CameraClipPreviewTrait.NAME
454            ]
455            event_image = await clip_preview_trait.generate_event_image(
456                clip_event.preview_url
457            )
458            if event_image:
459                content = await event_image.contents()
460                # Caller will persist the media key assignment
461                media_key = store.get_clip_preview_media_key(
462                    self._device_id, item.visible_event
463                )
464                item.media_key = media_key
465                _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id)
466                self._diagnostics.increment("fetch_clip.save")
467                await store.async_save_media(media_key, content)
468                return
469
470        if CameraEventImageTrait.NAME not in self._traits:
471            return
472        self._diagnostics.increment("fetch_image")
473
474        event_image_trait: CameraEventImageTrait = self._traits[
475            CameraEventImageTrait.NAME
476        ]
477        for event in item.events.values():
478            if event.event_id in item.event_media_keys or event.is_expired:
479                self._diagnostics.increment("fetch_image.skip")
480                continue
481            event_image = await event_image_trait.generate_image(event.event_id)
482            content = await event_image.contents(width=SNAPSHOT_WIDTH_PX)
483
484            # Caller will persist the media key assignment
485            media_key = store.get_image_media_key(self._device_id, event)
486            item.event_media_keys[event.event_id] = media_key
487            _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id)
488            self._diagnostics.increment("fetch_image.save")
489            await store.async_save_media(media_key, content)
490
491    async def get_media_from_token(self, event_token: str) -> Media | None:
492        """Get media based on the event token."""
493        token = EventToken.decode(event_token)
494        if not (item := await self._async_load_item(token.event_session_id)):
495            self._diagnostics.increment("get_media.invalid_event")
496            _LOGGER.debug(
497                "No event information found for event id: %s", token.event_session_id
498            )
499            return None
500        media_key = item.media_key_for_token(token)
501        if not media_key:
502            self._diagnostics.increment("get_media.no_media")
503            _LOGGER.debug("No persisted media for event id %s", token)
504            return None
505        contents = await self._cache_policy.store.async_load_media(media_key)
506        if not contents:
507            self._diagnostics.increment("get_media.empty")
508            _LOGGER.debug(
509                "Unable to load persisted media for event id: (%s, %s, %s)",
510                token.event_session_id,
511                token.event_id,
512                item.media_key,
513            )
514            return None
515        assert item.visible_event
516        self._diagnostics.increment("get_media.success")
517        return Media(contents, item.visible_event.event_image_type)
518
519    async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None:
520        """Get a thumbnail from the event token."""
521        self._diagnostics.increment("get_clip")
522        token = EventToken.decode(event_token)
523        if (
524            not (item := await self._async_load_item(token.event_session_id))
525            or not item.visible_event
526        ):
527            self._diagnostics.increment("get_clip.invalid_event")
528            _LOGGER.debug(
529                "No event information found for event id: %s", token.event_session_id
530            )
531            return None
532
533        if item.thumbnail_media_key:
534            # Load cached thumbnail
535            contents = await self._cache_policy.store.async_load_media(
536                item.thumbnail_media_key
537            )
538            if contents:
539                self._diagnostics.increment("get_clip.cached")
540                return Media(contents, EventImageType.IMAGE_PREVIEW)
541            _LOGGER.debug(
542                "Thumbnail %s does not exist; transcoding", item.thumbnail_media_key
543            )
544
545        # Check for existing primary media
546        media_key = item.media_key_for_token(token)
547        if not media_key:
548            self._diagnostics.increment("get_clip.no_media")
549            _LOGGER.debug("No persisted media for event id %s", token)
550            return None
551
552        thumbnail_media_key = (
553            self._cache_policy.store.get_clip_preview_thumbnail_media_key(
554                self._device_id, item.visible_event
555            )
556        )
557        if not self._cache_policy.transcoder or not thumbnail_media_key:
558            self._diagnostics.increment("get_clip.no_transcoding")
559            _LOGGER.debug("Clip transcoding disabled")
560            return None
561
562        try:
563            await self._cache_policy.transcoder.transcode_clip(
564                media_key, thumbnail_media_key
565            )
566        except TranscodeException as err:
567            self._diagnostics.increment("get_clip.transcode_error")
568            _LOGGER.debug("Failure to transcode clip thumbnail: %s", str(err))
569            return None
570
571        contents = await self._cache_policy.store.async_load_media(thumbnail_media_key)
572        if not contents:
573            self._diagnostics.increment("get_clip.load_error")
574            _LOGGER.debug(
575                "Failed to load transcoded clip: %s", item.thumbnail_media_key
576            )
577            return None
578
579        item.thumbnail_media_key = thumbnail_media_key
580        await self._async_update_item(item)
581
582        self._diagnostics.increment("get_clip.success")
583        return Media(contents, EventImageType.IMAGE_PREVIEW)
584
585    async def async_image_sessions(self) -> list[ImageSession]:
586        """Return revent events."""
587        self._diagnostics.increment("load_image_sessions")
588
589        def _get_events(x: EventMediaModelItem) -> list[ImageEventBase]:
590            # Only return events that have successful media fetches
591            return [
592                y
593                for y in x.events.values()
594                if x.media_key or y.event_id in x.event_media_keys
595            ]
596
597        result = await self._items_with_media()
598        events_list = list(map(_get_events, result))
599        events: Iterable[ImageEventBase] = itertools.chain(*events_list)
600
601        def _get_session(x: ImageEventBase) -> ImageSession:
602            return ImageSession(x.event_token, x.timestamp, x.event_type)
603
604        event_result = list(map(_get_session, events))
605        event_result.sort(key=lambda x: x.timestamp, reverse=True)
606        return event_result
607
608    async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]:
609        """Return revent events for a device that supports clips."""
610        self._diagnostics.increment("load_clip_previews")
611
612        def _event_visible(x: ImageEventBase) -> bool:
613            return x.event_type in VISIBLE_EVENTS
614
615        def _get_event_session(x: EventMediaModelItem) -> ClipPreviewSession | None:
616            assert x.visible_event
617            events = list(filter(_event_visible, x.events.values()))
618            events.sort(key=lambda x: x.timestamp)
619            if not events:
620                _LOGGER.debug("Partial event in storage")
621                return None
622            visible_event = events[0]
623            return ClipPreviewSession(
624                visible_event.event_token,
625                visible_event.timestamp,
626                [y.event_type for y in events],
627            )
628
629        result = await self._items_with_media()
630        clips: Iterable[ClipPreviewSession | None] = iter(
631            map(_get_event_session, result)
632        )
633        valid_clips: list[ClipPreviewSession] = [x for x in clips if x is not None]
634        valid_clips.sort(key=lambda x: x.timestamp, reverse=True)
635        return valid_clips
636
637    async def _items_with_media(self) -> list[EventMediaModelItem]:
638        """Return items in the model that have media for serving."""
639
640        def _filter(x: EventMediaModelItem) -> bool:
641            """Return events already fetched or that could be fetched."""
642            if x.media_key or x.event_media_keys:
643                return True
644            return False
645
646        event_data = await self._async_load()
647        return list(filter(_filter, event_data.values()))
648
649    def set_update_callback(
650        self, target: Callable[[EventMessage], Awaitable[None]]
651    ) -> None:
652        """Register a callback invoked when new messages are received."""
653        self._callback = target
654
655    async def async_handle_events(self, event_message: EventMessage) -> None:
656        """Handle the EventMessage."""
657        self._diagnostics.increment("event")
658        event_sessions: dict[str, dict[str, ImageEventBase]] | None = (
659            event_message.event_sessions
660        )
661        if not event_sessions:
662            return
663        _LOGGER.debug("Event Update %s", event_sessions.keys())
664        recv_latency_ms = int((time.time() - event_message.timestamp.timestamp()) * 100)
665
666        # Notify traits to cache most recent event
667        pairs = list(event_sessions.items())
668        for event_session_id, event_dict in pairs:
669            supported = False
670            for event_name, event in event_dict.items():
671                if not event.is_expired:
672                    self._diagnostics.elapsed(event_name, recv_latency_ms)
673                else:
674                    self._diagnostics.elapsed(f"{event_name}_expired", recv_latency_ms)
675                if event_name not in self._event_traits:
676                    self._diagnostics.increment(f"event.unsupported.{event_name}")
677                    _LOGGER.debug("Unsupported event trait: %s", event_name)
678                    continue
679                supported = True
680
681            # Skip any entirely unsupported events
682            if not supported:
683                del event_sessions[event_session_id]
684
685        model_items = []
686        failure = False
687        for event_session_id, event_dict in event_sessions.items():
688            # Track all related events together with the same session
689            if model_item := await self._async_load_item(event_session_id):
690                self._diagnostics.increment("event.update")
691                model_item.merge_events(event_dict)
692            else:
693                self._diagnostics.increment("event.new")
694                # A new event session
695                model_item = EventMediaModelItem(
696                    event_session_id=event_session_id,
697                    events=event_dict,
698                    media_key=None,
699                    event_media_keys={},
700                    thumbnail_media_key=None,
701                    pending_event_keys=set(event_dict.keys()),
702                )
703            model_items.append(model_item)
704
705            if self._support_fetch and self._cache_policy.fetch:
706                self._diagnostics.increment("event.fetch")
707                try:
708                    await self._fetch_media(model_item)
709                except GoogleNestException as err:
710                    self._diagnostics.increment("event.fetch_error")
711                    failure = True
712                    _LOGGER.warning(
713                        "Failure when pre-fetching event '%s': %s",
714                        event.event_session_id,
715                        str(err),
716                    )
717
718        # Send notifications for any undelivered events that have media.
719        pending_events: dict[str, ImageEventBase] = {}
720        for model_item in model_items:
721            if (
722                model_item.any_media_key is None
723                and self._support_fetch
724                and self._cache_policy.fetch
725                and not event_message.is_thread_ended
726                and not failure
727            ):
728                continue
729            pending_events.update(model_item.pending_events)
730
731        if pending_events:
732            _LOGGER.debug("Message contains notifiable events: %s", pending_events)
733            event_message = event_message.with_events(
734                pending_events.keys(), pending_events
735            )
736            if self._callback:
737                self._diagnostics.increment("event.notify")
738                await self._callback(event_message)
739        else:
740            _LOGGER.debug("Message did not contain notifiable events")
741
742        for model_item in model_items:
743            model_item.notified(pending_events.keys())
744            await self._async_update_item(model_item)
745
746        await self._expire_cache()
class EventMediaManager:
333class EventMediaManager:
334    """Responsible for handling recent events and fetching associated media."""
335
336    def __init__(
337        self,
338        device_id: str,
339        traits: dict[str, Any],
340        event_traits: set[str],
341        diagnostics: Diagnostics,
342    ) -> None:
343        """Initialize DeviceEventMediaManager."""
344        self._device_id = device_id
345        self._traits = traits
346        self._event_traits = event_traits
347        self._cache_policy = CachePolicy()
348        self._callback: Callable[[EventMessage], Awaitable[None]] | None = None
349        self._support_fetch = (
350            CameraClipPreviewTrait.NAME in traits
351            or CameraEventImageTrait.NAME in traits
352        )
353        self._diagnostics = diagnostics
354        self._lock: asyncio.Lock | None = None
355
356    @property
357    def cache_policy(self) -> CachePolicy:
358        """Return the current CachePolicy."""
359        return self._cache_policy
360
361    @cache_policy.setter
362    def cache_policy(self, value: CachePolicy) -> None:
363        """Update the CachePolicy."""
364        self._cache_policy = value
365
366    async def _async_load(self) -> OrderedDict[str, EventMediaModelItem]:
367        """Load the device specific data from the store."""
368        store_data = await self._cache_policy.store.async_load()
369        event_data: OrderedDict[str, EventMediaModelItem] = OrderedDict()
370        if store_data:
371            device_data = store_data.get(self._device_id, [])
372            for item_data in device_data:
373                try:
374                    item = EventMediaModelItem.from_dict(item_data)
375                except Exception as err:
376                    _LOGGER.debug("Failed to parse event item: %s", str(err))
377                    raise err
378                event_data[item.event_session_id] = item
379        return event_data
380
381    async def _async_update(self, event_data: dict[str, EventMediaModelItem]) -> None:
382        """Save the device specific model to the store."""
383        # Event order is preserved so popping from the oldest entry works
384        device_data: list[dict[str, Any]] = []
385        for item in event_data.values():
386            device_data.append(item.to_dict())
387
388        # Read data from the store and update information for this device
389        store_data = await self._cache_policy.store.async_load()
390        if not store_data:
391            store_data = {}
392        store_data[self._device_id] = device_data
393        await self._cache_policy.store.async_save(store_data)
394
395    async def _async_load_item(
396        self, event_session_id: str
397    ) -> EventMediaModelItem | None:
398        """Load the specific item from the store."""
399        event_data = await self._async_load()
400        return event_data.get(event_session_id)
401
402    async def _async_update_item(self, item: EventMediaModelItem) -> None:
403        """Update the specific item in the store."""
404        if not self._lock:
405            self._lock = asyncio.Lock()
406        async with self._lock:
407            event_data = await self._async_load()
408            event_data[item.event_session_id] = item
409            await self._async_update(event_data)
410
411    async def _expire_cache(self) -> None:
412        """Garbage collect any items from the cache."""
413        if not self._lock:
414            self._lock = asyncio.Lock()
415        async with self._lock:
416            event_data = await self._async_load()
417            _LOGGER.debug("Checking cache size %s", len(event_data))
418            if len(event_data) <= self._cache_policy.event_cache_size:
419                return
420            _LOGGER.debug(
421                "Expiring cache %s", self._cache_policy.event_cache_expire_count
422            )
423            # Bulk pop items
424            for i in range(0, self._cache_policy.event_cache_expire_count):
425                (key, old_item) = event_data.popitem(last=False)
426                _LOGGER.debug(
427                    "Expiring media %s (%s)",
428                    old_item.all_media_keys,
429                    old_item.event_session_id,
430                )
431                for media_key in old_item.all_media_keys:
432                    await self._cache_policy.store.async_remove_media(media_key)
433            await self._async_update(event_data)
434
435    async def _fetch_media(self, item: EventMediaModelItem) -> None:
436        """Fetch media from the server in response to a pubsub event."""
437        store = self._cache_policy.store
438        if CameraClipPreviewTrait.NAME in self._traits:
439            self._diagnostics.increment("fetch_clip")
440            if (
441                item.media_key
442                or not item.visible_event
443                or not (
444                    clip_event := cast(
445                        CameraClipPreviewEvent,
446                        item.events.get(CameraClipPreviewEvent.NAME),
447                    )
448                )
449                or clip_event.is_expired
450            ):
451                self._diagnostics.increment("fetch_clip.skip")
452                return
453            clip_preview_trait: CameraClipPreviewTrait = self._traits[
454                CameraClipPreviewTrait.NAME
455            ]
456            event_image = await clip_preview_trait.generate_event_image(
457                clip_event.preview_url
458            )
459            if event_image:
460                content = await event_image.contents()
461                # Caller will persist the media key assignment
462                media_key = store.get_clip_preview_media_key(
463                    self._device_id, item.visible_event
464                )
465                item.media_key = media_key
466                _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id)
467                self._diagnostics.increment("fetch_clip.save")
468                await store.async_save_media(media_key, content)
469                return
470
471        if CameraEventImageTrait.NAME not in self._traits:
472            return
473        self._diagnostics.increment("fetch_image")
474
475        event_image_trait: CameraEventImageTrait = self._traits[
476            CameraEventImageTrait.NAME
477        ]
478        for event in item.events.values():
479            if event.event_id in item.event_media_keys or event.is_expired:
480                self._diagnostics.increment("fetch_image.skip")
481                continue
482            event_image = await event_image_trait.generate_image(event.event_id)
483            content = await event_image.contents(width=SNAPSHOT_WIDTH_PX)
484
485            # Caller will persist the media key assignment
486            media_key = store.get_image_media_key(self._device_id, event)
487            item.event_media_keys[event.event_id] = media_key
488            _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id)
489            self._diagnostics.increment("fetch_image.save")
490            await store.async_save_media(media_key, content)
491
492    async def get_media_from_token(self, event_token: str) -> Media | None:
493        """Get media based on the event token."""
494        token = EventToken.decode(event_token)
495        if not (item := await self._async_load_item(token.event_session_id)):
496            self._diagnostics.increment("get_media.invalid_event")
497            _LOGGER.debug(
498                "No event information found for event id: %s", token.event_session_id
499            )
500            return None
501        media_key = item.media_key_for_token(token)
502        if not media_key:
503            self._diagnostics.increment("get_media.no_media")
504            _LOGGER.debug("No persisted media for event id %s", token)
505            return None
506        contents = await self._cache_policy.store.async_load_media(media_key)
507        if not contents:
508            self._diagnostics.increment("get_media.empty")
509            _LOGGER.debug(
510                "Unable to load persisted media for event id: (%s, %s, %s)",
511                token.event_session_id,
512                token.event_id,
513                item.media_key,
514            )
515            return None
516        assert item.visible_event
517        self._diagnostics.increment("get_media.success")
518        return Media(contents, item.visible_event.event_image_type)
519
520    async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None:
521        """Get a thumbnail from the event token."""
522        self._diagnostics.increment("get_clip")
523        token = EventToken.decode(event_token)
524        if (
525            not (item := await self._async_load_item(token.event_session_id))
526            or not item.visible_event
527        ):
528            self._diagnostics.increment("get_clip.invalid_event")
529            _LOGGER.debug(
530                "No event information found for event id: %s", token.event_session_id
531            )
532            return None
533
534        if item.thumbnail_media_key:
535            # Load cached thumbnail
536            contents = await self._cache_policy.store.async_load_media(
537                item.thumbnail_media_key
538            )
539            if contents:
540                self._diagnostics.increment("get_clip.cached")
541                return Media(contents, EventImageType.IMAGE_PREVIEW)
542            _LOGGER.debug(
543                "Thumbnail %s does not exist; transcoding", item.thumbnail_media_key
544            )
545
546        # Check for existing primary media
547        media_key = item.media_key_for_token(token)
548        if not media_key:
549            self._diagnostics.increment("get_clip.no_media")
550            _LOGGER.debug("No persisted media for event id %s", token)
551            return None
552
553        thumbnail_media_key = (
554            self._cache_policy.store.get_clip_preview_thumbnail_media_key(
555                self._device_id, item.visible_event
556            )
557        )
558        if not self._cache_policy.transcoder or not thumbnail_media_key:
559            self._diagnostics.increment("get_clip.no_transcoding")
560            _LOGGER.debug("Clip transcoding disabled")
561            return None
562
563        try:
564            await self._cache_policy.transcoder.transcode_clip(
565                media_key, thumbnail_media_key
566            )
567        except TranscodeException as err:
568            self._diagnostics.increment("get_clip.transcode_error")
569            _LOGGER.debug("Failure to transcode clip thumbnail: %s", str(err))
570            return None
571
572        contents = await self._cache_policy.store.async_load_media(thumbnail_media_key)
573        if not contents:
574            self._diagnostics.increment("get_clip.load_error")
575            _LOGGER.debug(
576                "Failed to load transcoded clip: %s", item.thumbnail_media_key
577            )
578            return None
579
580        item.thumbnail_media_key = thumbnail_media_key
581        await self._async_update_item(item)
582
583        self._diagnostics.increment("get_clip.success")
584        return Media(contents, EventImageType.IMAGE_PREVIEW)
585
586    async def async_image_sessions(self) -> list[ImageSession]:
587        """Return revent events."""
588        self._diagnostics.increment("load_image_sessions")
589
590        def _get_events(x: EventMediaModelItem) -> list[ImageEventBase]:
591            # Only return events that have successful media fetches
592            return [
593                y
594                for y in x.events.values()
595                if x.media_key or y.event_id in x.event_media_keys
596            ]
597
598        result = await self._items_with_media()
599        events_list = list(map(_get_events, result))
600        events: Iterable[ImageEventBase] = itertools.chain(*events_list)
601
602        def _get_session(x: ImageEventBase) -> ImageSession:
603            return ImageSession(x.event_token, x.timestamp, x.event_type)
604
605        event_result = list(map(_get_session, events))
606        event_result.sort(key=lambda x: x.timestamp, reverse=True)
607        return event_result
608
609    async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]:
610        """Return revent events for a device that supports clips."""
611        self._diagnostics.increment("load_clip_previews")
612
613        def _event_visible(x: ImageEventBase) -> bool:
614            return x.event_type in VISIBLE_EVENTS
615
616        def _get_event_session(x: EventMediaModelItem) -> ClipPreviewSession | None:
617            assert x.visible_event
618            events = list(filter(_event_visible, x.events.values()))
619            events.sort(key=lambda x: x.timestamp)
620            if not events:
621                _LOGGER.debug("Partial event in storage")
622                return None
623            visible_event = events[0]
624            return ClipPreviewSession(
625                visible_event.event_token,
626                visible_event.timestamp,
627                [y.event_type for y in events],
628            )
629
630        result = await self._items_with_media()
631        clips: Iterable[ClipPreviewSession | None] = iter(
632            map(_get_event_session, result)
633        )
634        valid_clips: list[ClipPreviewSession] = [x for x in clips if x is not None]
635        valid_clips.sort(key=lambda x: x.timestamp, reverse=True)
636        return valid_clips
637
638    async def _items_with_media(self) -> list[EventMediaModelItem]:
639        """Return items in the model that have media for serving."""
640
641        def _filter(x: EventMediaModelItem) -> bool:
642            """Return events already fetched or that could be fetched."""
643            if x.media_key or x.event_media_keys:
644                return True
645            return False
646
647        event_data = await self._async_load()
648        return list(filter(_filter, event_data.values()))
649
650    def set_update_callback(
651        self, target: Callable[[EventMessage], Awaitable[None]]
652    ) -> None:
653        """Register a callback invoked when new messages are received."""
654        self._callback = target
655
656    async def async_handle_events(self, event_message: EventMessage) -> None:
657        """Handle the EventMessage."""
658        self._diagnostics.increment("event")
659        event_sessions: dict[str, dict[str, ImageEventBase]] | None = (
660            event_message.event_sessions
661        )
662        if not event_sessions:
663            return
664        _LOGGER.debug("Event Update %s", event_sessions.keys())
665        recv_latency_ms = int((time.time() - event_message.timestamp.timestamp()) * 100)
666
667        # Notify traits to cache most recent event
668        pairs = list(event_sessions.items())
669        for event_session_id, event_dict in pairs:
670            supported = False
671            for event_name, event in event_dict.items():
672                if not event.is_expired:
673                    self._diagnostics.elapsed(event_name, recv_latency_ms)
674                else:
675                    self._diagnostics.elapsed(f"{event_name}_expired", recv_latency_ms)
676                if event_name not in self._event_traits:
677                    self._diagnostics.increment(f"event.unsupported.{event_name}")
678                    _LOGGER.debug("Unsupported event trait: %s", event_name)
679                    continue
680                supported = True
681
682            # Skip any entirely unsupported events
683            if not supported:
684                del event_sessions[event_session_id]
685
686        model_items = []
687        failure = False
688        for event_session_id, event_dict in event_sessions.items():
689            # Track all related events together with the same session
690            if model_item := await self._async_load_item(event_session_id):
691                self._diagnostics.increment("event.update")
692                model_item.merge_events(event_dict)
693            else:
694                self._diagnostics.increment("event.new")
695                # A new event session
696                model_item = EventMediaModelItem(
697                    event_session_id=event_session_id,
698                    events=event_dict,
699                    media_key=None,
700                    event_media_keys={},
701                    thumbnail_media_key=None,
702                    pending_event_keys=set(event_dict.keys()),
703                )
704            model_items.append(model_item)
705
706            if self._support_fetch and self._cache_policy.fetch:
707                self._diagnostics.increment("event.fetch")
708                try:
709                    await self._fetch_media(model_item)
710                except GoogleNestException as err:
711                    self._diagnostics.increment("event.fetch_error")
712                    failure = True
713                    _LOGGER.warning(
714                        "Failure when pre-fetching event '%s': %s",
715                        event.event_session_id,
716                        str(err),
717                    )
718
719        # Send notifications for any undelivered events that have media.
720        pending_events: dict[str, ImageEventBase] = {}
721        for model_item in model_items:
722            if (
723                model_item.any_media_key is None
724                and self._support_fetch
725                and self._cache_policy.fetch
726                and not event_message.is_thread_ended
727                and not failure
728            ):
729                continue
730            pending_events.update(model_item.pending_events)
731
732        if pending_events:
733            _LOGGER.debug("Message contains notifiable events: %s", pending_events)
734            event_message = event_message.with_events(
735                pending_events.keys(), pending_events
736            )
737            if self._callback:
738                self._diagnostics.increment("event.notify")
739                await self._callback(event_message)
740        else:
741            _LOGGER.debug("Message did not contain notifiable events")
742
743        for model_item in model_items:
744            model_item.notified(pending_events.keys())
745            await self._async_update_item(model_item)
746
747        await self._expire_cache()

Responsible for handling recent events and fetching associated media.

EventMediaManager( device_id: str, traits: dict[str, typing.Any], event_traits: set[str], diagnostics: google_nest_sdm.diagnostics.Diagnostics)
336    def __init__(
337        self,
338        device_id: str,
339        traits: dict[str, Any],
340        event_traits: set[str],
341        diagnostics: Diagnostics,
342    ) -> None:
343        """Initialize DeviceEventMediaManager."""
344        self._device_id = device_id
345        self._traits = traits
346        self._event_traits = event_traits
347        self._cache_policy = CachePolicy()
348        self._callback: Callable[[EventMessage], Awaitable[None]] | None = None
349        self._support_fetch = (
350            CameraClipPreviewTrait.NAME in traits
351            or CameraEventImageTrait.NAME in traits
352        )
353        self._diagnostics = diagnostics
354        self._lock: asyncio.Lock | None = None

Initialize DeviceEventMediaManager.

cache_policy: CachePolicy
356    @property
357    def cache_policy(self) -> CachePolicy:
358        """Return the current CachePolicy."""
359        return self._cache_policy

Return the current CachePolicy.

async def get_media_from_token(self, event_token: str) -> Media | None:
492    async def get_media_from_token(self, event_token: str) -> Media | None:
493        """Get media based on the event token."""
494        token = EventToken.decode(event_token)
495        if not (item := await self._async_load_item(token.event_session_id)):
496            self._diagnostics.increment("get_media.invalid_event")
497            _LOGGER.debug(
498                "No event information found for event id: %s", token.event_session_id
499            )
500            return None
501        media_key = item.media_key_for_token(token)
502        if not media_key:
503            self._diagnostics.increment("get_media.no_media")
504            _LOGGER.debug("No persisted media for event id %s", token)
505            return None
506        contents = await self._cache_policy.store.async_load_media(media_key)
507        if not contents:
508            self._diagnostics.increment("get_media.empty")
509            _LOGGER.debug(
510                "Unable to load persisted media for event id: (%s, %s, %s)",
511                token.event_session_id,
512                token.event_id,
513                item.media_key,
514            )
515            return None
516        assert item.visible_event
517        self._diagnostics.increment("get_media.success")
518        return Media(contents, item.visible_event.event_image_type)

Get media based on the event token.

async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None:
520    async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None:
521        """Get a thumbnail from the event token."""
522        self._diagnostics.increment("get_clip")
523        token = EventToken.decode(event_token)
524        if (
525            not (item := await self._async_load_item(token.event_session_id))
526            or not item.visible_event
527        ):
528            self._diagnostics.increment("get_clip.invalid_event")
529            _LOGGER.debug(
530                "No event information found for event id: %s", token.event_session_id
531            )
532            return None
533
534        if item.thumbnail_media_key:
535            # Load cached thumbnail
536            contents = await self._cache_policy.store.async_load_media(
537                item.thumbnail_media_key
538            )
539            if contents:
540                self._diagnostics.increment("get_clip.cached")
541                return Media(contents, EventImageType.IMAGE_PREVIEW)
542            _LOGGER.debug(
543                "Thumbnail %s does not exist; transcoding", item.thumbnail_media_key
544            )
545
546        # Check for existing primary media
547        media_key = item.media_key_for_token(token)
548        if not media_key:
549            self._diagnostics.increment("get_clip.no_media")
550            _LOGGER.debug("No persisted media for event id %s", token)
551            return None
552
553        thumbnail_media_key = (
554            self._cache_policy.store.get_clip_preview_thumbnail_media_key(
555                self._device_id, item.visible_event
556            )
557        )
558        if not self._cache_policy.transcoder or not thumbnail_media_key:
559            self._diagnostics.increment("get_clip.no_transcoding")
560            _LOGGER.debug("Clip transcoding disabled")
561            return None
562
563        try:
564            await self._cache_policy.transcoder.transcode_clip(
565                media_key, thumbnail_media_key
566            )
567        except TranscodeException as err:
568            self._diagnostics.increment("get_clip.transcode_error")
569            _LOGGER.debug("Failure to transcode clip thumbnail: %s", str(err))
570            return None
571
572        contents = await self._cache_policy.store.async_load_media(thumbnail_media_key)
573        if not contents:
574            self._diagnostics.increment("get_clip.load_error")
575            _LOGGER.debug(
576                "Failed to load transcoded clip: %s", item.thumbnail_media_key
577            )
578            return None
579
580        item.thumbnail_media_key = thumbnail_media_key
581        await self._async_update_item(item)
582
583        self._diagnostics.increment("get_clip.success")
584        return Media(contents, EventImageType.IMAGE_PREVIEW)

Get a thumbnail from the event token.

async def async_image_sessions(self) -> list[ImageSession]:
586    async def async_image_sessions(self) -> list[ImageSession]:
587        """Return revent events."""
588        self._diagnostics.increment("load_image_sessions")
589
590        def _get_events(x: EventMediaModelItem) -> list[ImageEventBase]:
591            # Only return events that have successful media fetches
592            return [
593                y
594                for y in x.events.values()
595                if x.media_key or y.event_id in x.event_media_keys
596            ]
597
598        result = await self._items_with_media()
599        events_list = list(map(_get_events, result))
600        events: Iterable[ImageEventBase] = itertools.chain(*events_list)
601
602        def _get_session(x: ImageEventBase) -> ImageSession:
603            return ImageSession(x.event_token, x.timestamp, x.event_type)
604
605        event_result = list(map(_get_session, events))
606        event_result.sort(key=lambda x: x.timestamp, reverse=True)
607        return event_result

Return revent events.

async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]:
609    async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]:
610        """Return revent events for a device that supports clips."""
611        self._diagnostics.increment("load_clip_previews")
612
613        def _event_visible(x: ImageEventBase) -> bool:
614            return x.event_type in VISIBLE_EVENTS
615
616        def _get_event_session(x: EventMediaModelItem) -> ClipPreviewSession | None:
617            assert x.visible_event
618            events = list(filter(_event_visible, x.events.values()))
619            events.sort(key=lambda x: x.timestamp)
620            if not events:
621                _LOGGER.debug("Partial event in storage")
622                return None
623            visible_event = events[0]
624            return ClipPreviewSession(
625                visible_event.event_token,
626                visible_event.timestamp,
627                [y.event_type for y in events],
628            )
629
630        result = await self._items_with_media()
631        clips: Iterable[ClipPreviewSession | None] = iter(
632            map(_get_event_session, result)
633        )
634        valid_clips: list[ClipPreviewSession] = [x for x in clips if x is not None]
635        valid_clips.sort(key=lambda x: x.timestamp, reverse=True)
636        return valid_clips

Return revent events for a device that supports clips.

def set_update_callback( self, target: Callable[[google_nest_sdm.event.EventMessage], Awaitable[NoneType]]) -> None:
650    def set_update_callback(
651        self, target: Callable[[EventMessage], Awaitable[None]]
652    ) -> None:
653        """Register a callback invoked when new messages are received."""
654        self._callback = target

Register a callback invoked when new messages are received.

async def async_handle_events(self, event_message: google_nest_sdm.event.EventMessage) -> None:
656    async def async_handle_events(self, event_message: EventMessage) -> None:
657        """Handle the EventMessage."""
658        self._diagnostics.increment("event")
659        event_sessions: dict[str, dict[str, ImageEventBase]] | None = (
660            event_message.event_sessions
661        )
662        if not event_sessions:
663            return
664        _LOGGER.debug("Event Update %s", event_sessions.keys())
665        recv_latency_ms = int((time.time() - event_message.timestamp.timestamp()) * 100)
666
667        # Notify traits to cache most recent event
668        pairs = list(event_sessions.items())
669        for event_session_id, event_dict in pairs:
670            supported = False
671            for event_name, event in event_dict.items():
672                if not event.is_expired:
673                    self._diagnostics.elapsed(event_name, recv_latency_ms)
674                else:
675                    self._diagnostics.elapsed(f"{event_name}_expired", recv_latency_ms)
676                if event_name not in self._event_traits:
677                    self._diagnostics.increment(f"event.unsupported.{event_name}")
678                    _LOGGER.debug("Unsupported event trait: %s", event_name)
679                    continue
680                supported = True
681
682            # Skip any entirely unsupported events
683            if not supported:
684                del event_sessions[event_session_id]
685
686        model_items = []
687        failure = False
688        for event_session_id, event_dict in event_sessions.items():
689            # Track all related events together with the same session
690            if model_item := await self._async_load_item(event_session_id):
691                self._diagnostics.increment("event.update")
692                model_item.merge_events(event_dict)
693            else:
694                self._diagnostics.increment("event.new")
695                # A new event session
696                model_item = EventMediaModelItem(
697                    event_session_id=event_session_id,
698                    events=event_dict,
699                    media_key=None,
700                    event_media_keys={},
701                    thumbnail_media_key=None,
702                    pending_event_keys=set(event_dict.keys()),
703                )
704            model_items.append(model_item)
705
706            if self._support_fetch and self._cache_policy.fetch:
707                self._diagnostics.increment("event.fetch")
708                try:
709                    await self._fetch_media(model_item)
710                except GoogleNestException as err:
711                    self._diagnostics.increment("event.fetch_error")
712                    failure = True
713                    _LOGGER.warning(
714                        "Failure when pre-fetching event '%s': %s",
715                        event.event_session_id,
716                        str(err),
717                    )
718
719        # Send notifications for any undelivered events that have media.
720        pending_events: dict[str, ImageEventBase] = {}
721        for model_item in model_items:
722            if (
723                model_item.any_media_key is None
724                and self._support_fetch
725                and self._cache_policy.fetch
726                and not event_message.is_thread_ended
727                and not failure
728            ):
729                continue
730            pending_events.update(model_item.pending_events)
731
732        if pending_events:
733            _LOGGER.debug("Message contains notifiable events: %s", pending_events)
734            event_message = event_message.with_events(
735                pending_events.keys(), pending_events
736            )
737            if self._callback:
738                self._diagnostics.increment("event.notify")
739                await self._callback(event_message)
740        else:
741            _LOGGER.debug("Message did not contain notifiable events")
742
743        for model_item in model_items:
744            model_item.notified(pending_events.keys())
745            await self._async_update_item(model_item)
746
747        await self._expire_cache()

Handle the EventMessage.

@dataclass
class ImageSession:
 95@dataclass
 96class ImageSession:
 97    """An object that holds an image based event."""
 98
 99    event_token: str
100    """A token that can be used to fetch the media for the event."""
101
102    timestamp: datetime.datetime
103    """Timestamp when the event happened."""
104
105    event_type: str
106    """A label for the type of event."""

An object that holds an image based event.

ImageSession(event_token: str, timestamp: datetime.datetime, event_type: str)
event_token: str

A token that can be used to fetch the media for the event.

timestamp: datetime.datetime

Timestamp when the event happened.

event_type: str

A label for the type of event.

@dataclass
class ClipPreviewSession:
109@dataclass
110class ClipPreviewSession:
111    """An object that holds a clip based event."""
112
113    event_token: str
114    """A token that can be used to fetch the media for the event."""
115
116    timestamp: datetime.datetime
117    """Timestamp when the event happened."""
118
119    event_types: list[str]
120    """A label for the type of event."""

An object that holds a clip based event.

ClipPreviewSession( event_token: str, timestamp: datetime.datetime, event_types: list[str])
event_token: str

A token that can be used to fetch the media for the event.

timestamp: datetime.datetime

Timestamp when the event happened.

event_types: list[str]

A label for the type of event.

@dataclass
class Media:
79@dataclass
80class Media:
81    """Represents media related to an event."""
82
83    contents: bytes
84    """Media content."""
85
86    event_image_type: EventImageContentType
87    """Content event image type of the media."""
88
89    @property
90    def content_type(self) -> str:
91        """Content type of the media."""
92        return self.event_image_type.content_type

Represents media related to an event.

Media( contents: bytes, event_image_type: google_nest_sdm.event.EventImageContentType)
contents: bytes

Media content.

event_image_type: google_nest_sdm.event.EventImageContentType

Content event image type of the media.

content_type: str
89    @property
90    def content_type(self) -> str:
91        """Content type of the media."""
92        return self.event_image_type.content_type

Content type of the media.

class EventMediaStore(abc.ABC):
123class EventMediaStore(ABC):
124    """Interface for external storage."""
125
126    @abstractmethod
127    async def async_load(self) -> dict | None:
128        """Load data."""
129
130    @abstractmethod
131    async def async_save(self, data: dict) -> None:
132        """Save data."""
133
134    @abstractmethod
135    def get_media_key(self, device_id: str, event: ImageEventBase) -> str:
136        """Return the filename to use for the device and event."""
137
138    def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str:
139        """Return the filename for image media."""
140        return self.get_media_key(device_id, event)
141
142    def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str:
143        """Return the filename for clip preview media."""
144        return self.get_media_key(device_id, event)
145
146    def get_clip_preview_thumbnail_media_key(
147        self, device_id: str, event: ImageEventBase
148    ) -> str | None:
149        """Return the filename for thumbnail for clip preview media."""
150        return None
151
152    async def async_load_media(self, media_key: str) -> bytes | None:
153        """Load media content."""
154
155    async def async_save_media(self, media_key: str, content: bytes) -> None:
156        """Write media content."""
157
158    async def async_remove_media(self, media_key: str) -> None:
159        """Remove media content."""

Interface for external storage.

@abstractmethod
async def async_load(self) -> dict | None:
126    @abstractmethod
127    async def async_load(self) -> dict | None:
128        """Load data."""

Load data.

@abstractmethod
async def async_save(self, data: dict) -> None:
130    @abstractmethod
131    async def async_save(self, data: dict) -> None:
132        """Save data."""

Save data.

@abstractmethod
def get_media_key(self, device_id: str, event: google_nest_sdm.event.ImageEventBase) -> str:
134    @abstractmethod
135    def get_media_key(self, device_id: str, event: ImageEventBase) -> str:
136        """Return the filename to use for the device and event."""

Return the filename to use for the device and event.

def get_image_media_key(self, device_id: str, event: google_nest_sdm.event.ImageEventBase) -> str:
138    def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str:
139        """Return the filename for image media."""
140        return self.get_media_key(device_id, event)

Return the filename for image media.

def get_clip_preview_media_key(self, device_id: str, event: google_nest_sdm.event.ImageEventBase) -> str:
142    def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str:
143        """Return the filename for clip preview media."""
144        return self.get_media_key(device_id, event)

Return the filename for clip preview media.

def get_clip_preview_thumbnail_media_key( self, device_id: str, event: google_nest_sdm.event.ImageEventBase) -> str | None:
146    def get_clip_preview_thumbnail_media_key(
147        self, device_id: str, event: ImageEventBase
148    ) -> str | None:
149        """Return the filename for thumbnail for clip preview media."""
150        return None

Return the filename for thumbnail for clip preview media.

async def async_load_media(self, media_key: str) -> bytes | None:
152    async def async_load_media(self, media_key: str) -> bytes | None:
153        """Load media content."""

Load media content.

async def async_save_media(self, media_key: str, content: bytes) -> None:
155    async def async_save_media(self, media_key: str, content: bytes) -> None:
156        """Write media content."""

Write media content.

async def async_remove_media(self, media_key: str) -> None:
158    async def async_remove_media(self, media_key: str) -> None:
159        """Remove media content."""

Remove media content.

@dataclass
class CachePolicy:
216@dataclass
217class CachePolicy:
218    """Policy for how many local objects to cache in memory."""
219
220    event_cache_size: int = DEFAULT_CACHE_SIZE
221    """Number of events to keep in memory per device."""
222
223    fetch: bool = False
224    """Determine if event media should be pre-fetched."""
225
226    store: EventMediaStore = field(default_factory=InMemoryEventMediaStore)
227    """The EventMediaStore object for storing media content."""
228
229    transcoder: Transcoder | None = None
230    """The transcoder for encoding media."""
231
232    @property
233    def event_cache_expire_count(self) -> int:
234        """Number of events to keep in memory per device."""
235        return max(int(self.event_cache_size * EXPIRE_CACHE_BATCH_SIZE), 1)

Policy for how many local objects to cache in memory.

CachePolicy( event_cache_size: int = 2, fetch: bool = False, store: EventMediaStore = <factory>, transcoder: google_nest_sdm.transcoder.Transcoder | None = None)
event_cache_size: int = 2

Number of events to keep in memory per device.

fetch: bool = False

Determine if event media should be pre-fetched.

The EventMediaStore object for storing media content.

transcoder: google_nest_sdm.transcoder.Transcoder | None = None

The transcoder for encoding media.

event_cache_expire_count: int
232    @property
233    def event_cache_expire_count(self) -> int:
234        """Number of events to keep in memory per device."""
235        return max(int(self.event_cache_size * EXPIRE_CACHE_BATCH_SIZE), 1)

Number of events to keep in memory per device.