google_nest_sdm.event_media
Libraries related to providing a device level interface for event related media.
An EventMediaManager is associated with a single device and manages the
state for events and the lifecycle of media for those events. The manager is
invoked by the subscriber when new events arrive and it handles any fetching
related to the media, as well as transcoding video clips of needed. The
CachePolicy settings determine lifecycle options such as how many events
to keep around in the underlying store.
1"""Libraries related to providing a device level interface for event related media. 2 3An `EventMediaManager` is associated with a single device and manages the 4state for events and the lifecycle of media for those events. The manager is 5invoked by the subscriber when new events arrive and it handles any fetching 6related to the media, as well as transcoding video clips of needed. The 7`CachePolicy` settings determine lifecycle options such as how many events 8to keep around in the underlying store. 9""" 10 11from __future__ import annotations 12 13import asyncio 14import datetime 15import itertools 16import logging 17import time 18from abc import ABC, abstractmethod 19from collections import OrderedDict 20from collections.abc import Iterable 21from dataclasses import dataclass, field 22from typing import Any, Awaitable, Callable, cast 23 24from mashumaro import DataClassDictMixin 25from mashumaro.types import SerializationStrategy 26from mashumaro.config import BaseConfig 27 28from .camera_traits import ( 29 CameraClipPreviewTrait, 30 CameraEventImageTrait, 31 EventImageContentType, 32) 33from .diagnostics import EVENT_MEDIA_DIAGNOSTICS as DIAGNOSTICS 34from .diagnostics import Diagnostics 35from .event import ( 36 CameraClipPreviewEvent, 37 EventImageType, 38 EventMessage, 39 EventToken, 40 ImageEventBase, 41 session_event_image_type, 42 CameraPersonEvent, 43 CameraMotionEvent, 44 CameraSoundEvent, 45 DoorbellChimeEvent, 46) 47from .exceptions import GoogleNestException, TranscodeException 48from .transcoder import Transcoder 49 50__all__ = [ 51 "EventMediaManager", 52 "ImageSession", 53 "ClipPreviewSession", 54 "Media", 55 "EventMediaStore", 56 "CachePolicy", 57] 58 59_LOGGER = logging.getLogger(__name__) 60 61# Should be large enough for processing, but not too large to be a size issue 62SNAPSHOT_WIDTH_PX = 1600 63 64DEFAULT_CACHE_SIZE = 2 65 66# Events are collapsed in the order shown here 67VISIBLE_EVENTS = [ 68 DoorbellChimeEvent.NAME, 69 CameraPersonEvent.NAME, 70 CameraMotionEvent.NAME, 71 CameraSoundEvent.NAME, 72] 73 74# Percentage of items to delete when bulk purging from the cache 75EXPIRE_CACHE_BATCH_SIZE = 0.05 76 77 78@dataclass 79class Media: 80 """Represents media related to an event.""" 81 82 contents: bytes 83 """Media content.""" 84 85 event_image_type: EventImageContentType 86 """Content event image type of the media.""" 87 88 @property 89 def content_type(self) -> str: 90 """Content type of the media.""" 91 return self.event_image_type.content_type 92 93 94@dataclass 95class ImageSession: 96 """An object that holds an image based event.""" 97 98 event_token: str 99 """A token that can be used to fetch the media for the event.""" 100 101 timestamp: datetime.datetime 102 """Timestamp when the event happened.""" 103 104 event_type: str 105 """A label for the type of event.""" 106 107 108@dataclass 109class ClipPreviewSession: 110 """An object that holds a clip based event.""" 111 112 event_token: str 113 """A token that can be used to fetch the media for the event.""" 114 115 timestamp: datetime.datetime 116 """Timestamp when the event happened.""" 117 118 event_types: list[str] 119 """A label for the type of event.""" 120 121 122class EventMediaStore(ABC): 123 """Interface for external storage.""" 124 125 @abstractmethod 126 async def async_load(self) -> dict | None: 127 """Load data.""" 128 129 @abstractmethod 130 async def async_save(self, data: dict) -> None: 131 """Save data.""" 132 133 @abstractmethod 134 def get_media_key(self, device_id: str, event: ImageEventBase) -> str: 135 """Return the filename to use for the device and event.""" 136 137 def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str: 138 """Return the filename for image media.""" 139 return self.get_media_key(device_id, event) 140 141 def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str: 142 """Return the filename for clip preview media.""" 143 return self.get_media_key(device_id, event) 144 145 def get_clip_preview_thumbnail_media_key( 146 self, device_id: str, event: ImageEventBase 147 ) -> str | None: 148 """Return the filename for thumbnail for clip preview media.""" 149 return None 150 151 async def async_load_media(self, media_key: str) -> bytes | None: 152 """Load media content.""" 153 154 async def async_save_media(self, media_key: str, content: bytes) -> None: 155 """Write media content.""" 156 157 async def async_remove_media(self, media_key: str) -> None: 158 """Remove media content.""" 159 160 161class InMemoryEventMediaStore(EventMediaStore): 162 """An in memory implementation of EventMediaStore.""" 163 164 def __init__(self) -> None: 165 self._data: dict | None = None 166 self._media: dict[str, bytes] = {} 167 168 async def async_load(self) -> dict | None: 169 """Load data.""" 170 return self._data 171 172 async def async_save(self, data: dict) -> None: 173 """Save data.""" 174 self._data = data 175 176 def get_media_key(self, device_id: str, event: ImageEventBase) -> str: 177 """Return the media key to use for the device and event.""" 178 suffix = "jpg" if event.event_image_type == EventImageType.IMAGE else "mp4" 179 return f"{device_id}_{event.timestamp}_{event.event_session_id}.{suffix}" 180 181 def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str: 182 """Return the media key to use for the device and event.""" 183 return ( 184 f"{device_id}_{event.timestamp}_{event.event_session_id}_" 185 f"{event.event_id}.jpg" 186 ) 187 188 def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str: 189 """Return the media key to use for the device and event.""" 190 return f"{device_id}_{event.timestamp}_{event.event_session_id}.mp4" 191 192 def get_clip_preview_thumbnail_media_key( 193 self, device_id: str, event: ImageEventBase 194 ) -> str | None: 195 """Return the media key to use for the clip preview thumbnail.""" 196 return f"{device_id}_{event.timestamp}_{event.event_session_id}_thumb.jpg" 197 198 async def async_load_media(self, media_key: str) -> bytes | None: 199 """Load media content.""" 200 with DIAGNOSTICS.timer("load_media"): 201 return self._media.get(media_key) 202 203 async def async_save_media(self, media_key: str, content: bytes) -> None: 204 """Remove media content.""" 205 with DIAGNOSTICS.timer("save_media"): 206 self._media[media_key] = content 207 208 async def async_remove_media(self, media_key: str) -> None: 209 """Remove media content.""" 210 with DIAGNOSTICS.timer("remove_media"): 211 if media_key in self._media: 212 del self._media[media_key] 213 214 215@dataclass 216class CachePolicy: 217 """Policy for how many local objects to cache in memory.""" 218 219 event_cache_size: int = DEFAULT_CACHE_SIZE 220 """Number of events to keep in memory per device.""" 221 222 fetch: bool = False 223 """Determine if event media should be pre-fetched.""" 224 225 store: EventMediaStore = field(default_factory=InMemoryEventMediaStore) 226 """The EventMediaStore object for storing media content.""" 227 228 transcoder: Transcoder | None = None 229 """The transcoder for encoding media.""" 230 231 @property 232 def event_cache_expire_count(self) -> int: 233 """Number of events to keep in memory per device.""" 234 return max(int(self.event_cache_size * EXPIRE_CACHE_BATCH_SIZE), 1) 235 236 237class ImageEventSerializationStrategy(SerializationStrategy): 238 """Serialization strategy for ImageEventBase.""" 239 240 def serialize(self, value: dict[str, ImageEventBase]) -> dict[str, Any]: 241 """Serialize ImageEventBase.""" 242 return dict((k, v.as_dict()) for k, v in value.items()) 243 244 def deserialize(self, value: dict[str, Any]) -> dict[str, ImageEventBase]: 245 """Deserialize ImageEventBase.""" 246 events: dict[str, ImageEventBase] = {} 247 for event_type, event_data in value.items(): 248 # Propagate timestamps to child nodes 249 if (timestamp := event_data.get("timestamp")) and ( 250 data := event_data.get("event_data") 251 ): 252 data["timestamp"] = timestamp 253 if event := ImageEventBase.parse_event_dict(event_data): 254 events[event_type] = event 255 256 # Link events to other events in the session 257 event_image_type = session_event_image_type(events.values()) 258 for event in events.values(): 259 event.event_image_type = event_image_type 260 return events 261 262 263@dataclass 264class EventMediaModelItem(DataClassDictMixin): 265 """Structure used to persist the event in EventMediaStore.""" 266 267 event_session_id: str 268 events: dict[str, ImageEventBase] = field(default_factory=dict) 269 media_key: str | None = field(default=None) 270 event_media_keys: dict[str, str] = field(default_factory=dict) 271 thumbnail_media_key: str | None = field(default=None) 272 pending_event_keys: set[str] = field(default_factory=set) 273 274 @property 275 def visible_event(self) -> ImageEventBase | None: 276 """Get the primary visible event for this item.""" 277 for event_type in VISIBLE_EVENTS: 278 if event := self.events.get(event_type): 279 return event 280 return None 281 282 def merge_events(self, new_events: dict[str, ImageEventBase]) -> None: 283 """Merge new incoming events with the existing set.""" 284 new_keys = new_events.keys() - self.events.keys() 285 self.events.update(new_events) 286 self.pending_event_keys |= new_keys 287 288 @property 289 def pending_events(self) -> dict[str, ImageEventBase]: 290 """Return all associated events with this record.""" 291 return { 292 key: value 293 for key, value in self.events.items() 294 if key in self.pending_event_keys 295 } 296 297 def notified(self, event_keys: Iterable[str]) -> None: 298 """Mark the specified events as notified.""" 299 self.pending_event_keys = self.pending_event_keys - set(event_keys) 300 301 def media_key_for_token(self, token: EventToken) -> str | None: 302 """Return media key for the specified event token.""" 303 if token.event_id: 304 if token.event_id in self.event_media_keys: 305 return self.event_media_keys[token.event_id] 306 # Fallback to legacy single event per session 307 return self.media_key 308 309 @property 310 def any_media_key(self) -> str | None: 311 """Return any media item for compatibility with legacy APIs.""" 312 if self.media_key: 313 return self.media_key 314 if self.event_media_keys.values(): 315 return next(iter(self.event_media_keys.values())) 316 return None 317 318 @property 319 def all_media_keys(self) -> list[str]: 320 """Return all media items for purging media keys.""" 321 keys = [self.media_key, self.thumbnail_media_key] + list( 322 self.event_media_keys.values() 323 ) 324 return [key for key in keys if key is not None] 325 326 class Config(BaseConfig): 327 serialization_strategy = { 328 dict[str, ImageEventBase]: ImageEventSerializationStrategy(), 329 } 330 331 332class EventMediaManager: 333 """Responsible for handling recent events and fetching associated media.""" 334 335 def __init__( 336 self, 337 device_id: str, 338 traits: dict[str, Any], 339 event_traits: set[str], 340 diagnostics: Diagnostics, 341 ) -> None: 342 """Initialize DeviceEventMediaManager.""" 343 self._device_id = device_id 344 self._traits = traits 345 self._event_traits = event_traits 346 self._cache_policy = CachePolicy() 347 self._callback: Callable[[EventMessage], Awaitable[None]] | None = None 348 self._support_fetch = ( 349 CameraClipPreviewTrait.NAME in traits 350 or CameraEventImageTrait.NAME in traits 351 ) 352 self._diagnostics = diagnostics 353 self._lock: asyncio.Lock | None = None 354 355 @property 356 def cache_policy(self) -> CachePolicy: 357 """Return the current CachePolicy.""" 358 return self._cache_policy 359 360 @cache_policy.setter 361 def cache_policy(self, value: CachePolicy) -> None: 362 """Update the CachePolicy.""" 363 self._cache_policy = value 364 365 async def _async_load(self) -> OrderedDict[str, EventMediaModelItem]: 366 """Load the device specific data from the store.""" 367 store_data = await self._cache_policy.store.async_load() 368 event_data: OrderedDict[str, EventMediaModelItem] = OrderedDict() 369 if store_data: 370 device_data = store_data.get(self._device_id, []) 371 for item_data in device_data: 372 try: 373 item = EventMediaModelItem.from_dict(item_data) 374 except Exception as err: 375 _LOGGER.debug("Failed to parse event item: %s", str(err)) 376 raise err 377 event_data[item.event_session_id] = item 378 return event_data 379 380 async def _async_update(self, event_data: dict[str, EventMediaModelItem]) -> None: 381 """Save the device specific model to the store.""" 382 # Event order is preserved so popping from the oldest entry works 383 device_data: list[dict[str, Any]] = [] 384 for item in event_data.values(): 385 device_data.append(item.to_dict()) 386 387 # Read data from the store and update information for this device 388 store_data = await self._cache_policy.store.async_load() 389 if not store_data: 390 store_data = {} 391 store_data[self._device_id] = device_data 392 await self._cache_policy.store.async_save(store_data) 393 394 async def _async_load_item( 395 self, event_session_id: str 396 ) -> EventMediaModelItem | None: 397 """Load the specific item from the store.""" 398 event_data = await self._async_load() 399 return event_data.get(event_session_id) 400 401 async def _async_update_item(self, item: EventMediaModelItem) -> None: 402 """Update the specific item in the store.""" 403 if not self._lock: 404 self._lock = asyncio.Lock() 405 async with self._lock: 406 event_data = await self._async_load() 407 event_data[item.event_session_id] = item 408 await self._async_update(event_data) 409 410 async def _expire_cache(self) -> None: 411 """Garbage collect any items from the cache.""" 412 if not self._lock: 413 self._lock = asyncio.Lock() 414 async with self._lock: 415 event_data = await self._async_load() 416 _LOGGER.debug("Checking cache size %s", len(event_data)) 417 if len(event_data) <= self._cache_policy.event_cache_size: 418 return 419 _LOGGER.debug( 420 "Expiring cache %s", self._cache_policy.event_cache_expire_count 421 ) 422 # Bulk pop items 423 for i in range(0, self._cache_policy.event_cache_expire_count): 424 (key, old_item) = event_data.popitem(last=False) 425 _LOGGER.debug( 426 "Expiring media %s (%s)", 427 old_item.all_media_keys, 428 old_item.event_session_id, 429 ) 430 for media_key in old_item.all_media_keys: 431 await self._cache_policy.store.async_remove_media(media_key) 432 await self._async_update(event_data) 433 434 async def _fetch_media(self, item: EventMediaModelItem) -> None: 435 """Fetch media from the server in response to a pubsub event.""" 436 store = self._cache_policy.store 437 if CameraClipPreviewTrait.NAME in self._traits: 438 self._diagnostics.increment("fetch_clip") 439 if ( 440 item.media_key 441 or not item.visible_event 442 or not ( 443 clip_event := cast( 444 CameraClipPreviewEvent, 445 item.events.get(CameraClipPreviewEvent.NAME), 446 ) 447 ) 448 or clip_event.is_expired 449 ): 450 self._diagnostics.increment("fetch_clip.skip") 451 return 452 clip_preview_trait: CameraClipPreviewTrait = self._traits[ 453 CameraClipPreviewTrait.NAME 454 ] 455 event_image = await clip_preview_trait.generate_event_image( 456 clip_event.preview_url 457 ) 458 if event_image: 459 content = await event_image.contents() 460 # Caller will persist the media key assignment 461 media_key = store.get_clip_preview_media_key( 462 self._device_id, item.visible_event 463 ) 464 item.media_key = media_key 465 _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id) 466 self._diagnostics.increment("fetch_clip.save") 467 await store.async_save_media(media_key, content) 468 return 469 470 if CameraEventImageTrait.NAME not in self._traits: 471 return 472 self._diagnostics.increment("fetch_image") 473 474 event_image_trait: CameraEventImageTrait = self._traits[ 475 CameraEventImageTrait.NAME 476 ] 477 for event in item.events.values(): 478 if event.event_id in item.event_media_keys or event.is_expired: 479 self._diagnostics.increment("fetch_image.skip") 480 continue 481 event_image = await event_image_trait.generate_image(event.event_id) 482 content = await event_image.contents(width=SNAPSHOT_WIDTH_PX) 483 484 # Caller will persist the media key assignment 485 media_key = store.get_image_media_key(self._device_id, event) 486 item.event_media_keys[event.event_id] = media_key 487 _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id) 488 self._diagnostics.increment("fetch_image.save") 489 await store.async_save_media(media_key, content) 490 491 async def get_media_from_token(self, event_token: str) -> Media | None: 492 """Get media based on the event token.""" 493 token = EventToken.decode(event_token) 494 if not (item := await self._async_load_item(token.event_session_id)): 495 self._diagnostics.increment("get_media.invalid_event") 496 _LOGGER.debug( 497 "No event information found for event id: %s", token.event_session_id 498 ) 499 return None 500 media_key = item.media_key_for_token(token) 501 if not media_key: 502 self._diagnostics.increment("get_media.no_media") 503 _LOGGER.debug("No persisted media for event id %s", token) 504 return None 505 contents = await self._cache_policy.store.async_load_media(media_key) 506 if not contents: 507 self._diagnostics.increment("get_media.empty") 508 _LOGGER.debug( 509 "Unable to load persisted media for event id: (%s, %s, %s)", 510 token.event_session_id, 511 token.event_id, 512 item.media_key, 513 ) 514 return None 515 assert item.visible_event 516 self._diagnostics.increment("get_media.success") 517 return Media(contents, item.visible_event.event_image_type) 518 519 async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None: 520 """Get a thumbnail from the event token.""" 521 self._diagnostics.increment("get_clip") 522 token = EventToken.decode(event_token) 523 if ( 524 not (item := await self._async_load_item(token.event_session_id)) 525 or not item.visible_event 526 ): 527 self._diagnostics.increment("get_clip.invalid_event") 528 _LOGGER.debug( 529 "No event information found for event id: %s", token.event_session_id 530 ) 531 return None 532 533 if item.thumbnail_media_key: 534 # Load cached thumbnail 535 contents = await self._cache_policy.store.async_load_media( 536 item.thumbnail_media_key 537 ) 538 if contents: 539 self._diagnostics.increment("get_clip.cached") 540 return Media(contents, EventImageType.IMAGE_PREVIEW) 541 _LOGGER.debug( 542 "Thumbnail %s does not exist; transcoding", item.thumbnail_media_key 543 ) 544 545 # Check for existing primary media 546 media_key = item.media_key_for_token(token) 547 if not media_key: 548 self._diagnostics.increment("get_clip.no_media") 549 _LOGGER.debug("No persisted media for event id %s", token) 550 return None 551 552 thumbnail_media_key = ( 553 self._cache_policy.store.get_clip_preview_thumbnail_media_key( 554 self._device_id, item.visible_event 555 ) 556 ) 557 if not self._cache_policy.transcoder or not thumbnail_media_key: 558 self._diagnostics.increment("get_clip.no_transcoding") 559 _LOGGER.debug("Clip transcoding disabled") 560 return None 561 562 try: 563 await self._cache_policy.transcoder.transcode_clip( 564 media_key, thumbnail_media_key 565 ) 566 except TranscodeException as err: 567 self._diagnostics.increment("get_clip.transcode_error") 568 _LOGGER.debug("Failure to transcode clip thumbnail: %s", str(err)) 569 return None 570 571 contents = await self._cache_policy.store.async_load_media(thumbnail_media_key) 572 if not contents: 573 self._diagnostics.increment("get_clip.load_error") 574 _LOGGER.debug( 575 "Failed to load transcoded clip: %s", item.thumbnail_media_key 576 ) 577 return None 578 579 item.thumbnail_media_key = thumbnail_media_key 580 await self._async_update_item(item) 581 582 self._diagnostics.increment("get_clip.success") 583 return Media(contents, EventImageType.IMAGE_PREVIEW) 584 585 async def async_image_sessions(self) -> list[ImageSession]: 586 """Return revent events.""" 587 self._diagnostics.increment("load_image_sessions") 588 589 def _get_events(x: EventMediaModelItem) -> list[ImageEventBase]: 590 # Only return events that have successful media fetches 591 return [ 592 y 593 for y in x.events.values() 594 if x.media_key or y.event_id in x.event_media_keys 595 ] 596 597 result = await self._items_with_media() 598 events_list = list(map(_get_events, result)) 599 events: Iterable[ImageEventBase] = itertools.chain(*events_list) 600 601 def _get_session(x: ImageEventBase) -> ImageSession: 602 return ImageSession(x.event_token, x.timestamp, x.event_type) 603 604 event_result = list(map(_get_session, events)) 605 event_result.sort(key=lambda x: x.timestamp, reverse=True) 606 return event_result 607 608 async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]: 609 """Return revent events for a device that supports clips.""" 610 self._diagnostics.increment("load_clip_previews") 611 612 def _event_visible(x: ImageEventBase) -> bool: 613 return x.event_type in VISIBLE_EVENTS 614 615 def _get_event_session(x: EventMediaModelItem) -> ClipPreviewSession | None: 616 assert x.visible_event 617 events = list(filter(_event_visible, x.events.values())) 618 events.sort(key=lambda x: x.timestamp) 619 if not events: 620 _LOGGER.debug("Partial event in storage") 621 return None 622 visible_event = events[0] 623 return ClipPreviewSession( 624 visible_event.event_token, 625 visible_event.timestamp, 626 [y.event_type for y in events], 627 ) 628 629 result = await self._items_with_media() 630 clips: Iterable[ClipPreviewSession | None] = iter( 631 map(_get_event_session, result) 632 ) 633 valid_clips: list[ClipPreviewSession] = [x for x in clips if x is not None] 634 valid_clips.sort(key=lambda x: x.timestamp, reverse=True) 635 return valid_clips 636 637 async def _items_with_media(self) -> list[EventMediaModelItem]: 638 """Return items in the model that have media for serving.""" 639 640 def _filter(x: EventMediaModelItem) -> bool: 641 """Return events already fetched or that could be fetched.""" 642 if x.media_key or x.event_media_keys: 643 return True 644 return False 645 646 event_data = await self._async_load() 647 return list(filter(_filter, event_data.values())) 648 649 def set_update_callback( 650 self, target: Callable[[EventMessage], Awaitable[None]] 651 ) -> None: 652 """Register a callback invoked when new messages are received.""" 653 self._callback = target 654 655 async def async_handle_events(self, event_message: EventMessage) -> None: 656 """Handle the EventMessage.""" 657 self._diagnostics.increment("event") 658 event_sessions: dict[str, dict[str, ImageEventBase]] | None = ( 659 event_message.event_sessions 660 ) 661 if not event_sessions: 662 return 663 _LOGGER.debug("Event Update %s", event_sessions.keys()) 664 recv_latency_ms = int((time.time() - event_message.timestamp.timestamp()) * 100) 665 666 # Notify traits to cache most recent event 667 pairs = list(event_sessions.items()) 668 for event_session_id, event_dict in pairs: 669 supported = False 670 for event_name, event in event_dict.items(): 671 if not event.is_expired: 672 self._diagnostics.elapsed(event_name, recv_latency_ms) 673 else: 674 self._diagnostics.elapsed(f"{event_name}_expired", recv_latency_ms) 675 if event_name not in self._event_traits: 676 self._diagnostics.increment(f"event.unsupported.{event_name}") 677 _LOGGER.debug("Unsupported event trait: %s", event_name) 678 continue 679 supported = True 680 681 # Skip any entirely unsupported events 682 if not supported: 683 del event_sessions[event_session_id] 684 685 model_items = [] 686 failure = False 687 for event_session_id, event_dict in event_sessions.items(): 688 # Track all related events together with the same session 689 if model_item := await self._async_load_item(event_session_id): 690 self._diagnostics.increment("event.update") 691 model_item.merge_events(event_dict) 692 else: 693 self._diagnostics.increment("event.new") 694 # A new event session 695 model_item = EventMediaModelItem( 696 event_session_id=event_session_id, 697 events=event_dict, 698 media_key=None, 699 event_media_keys={}, 700 thumbnail_media_key=None, 701 pending_event_keys=set(event_dict.keys()), 702 ) 703 model_items.append(model_item) 704 705 if self._support_fetch and self._cache_policy.fetch: 706 self._diagnostics.increment("event.fetch") 707 try: 708 await self._fetch_media(model_item) 709 except GoogleNestException as err: 710 self._diagnostics.increment("event.fetch_error") 711 failure = True 712 _LOGGER.warning( 713 "Failure when pre-fetching event '%s': %s", 714 event.event_session_id, 715 str(err), 716 ) 717 718 # Send notifications for any undelivered events that have media. 719 pending_events: dict[str, ImageEventBase] = {} 720 for model_item in model_items: 721 if ( 722 model_item.any_media_key is None 723 and self._support_fetch 724 and self._cache_policy.fetch 725 and not event_message.is_thread_ended 726 and not failure 727 ): 728 continue 729 pending_events.update(model_item.pending_events) 730 731 if pending_events: 732 _LOGGER.debug("Message contains notifiable events: %s", pending_events) 733 event_message = event_message.with_events( 734 pending_events.keys(), pending_events 735 ) 736 if self._callback: 737 self._diagnostics.increment("event.notify") 738 await self._callback(event_message) 739 else: 740 _LOGGER.debug("Message did not contain notifiable events") 741 742 for model_item in model_items: 743 model_item.notified(pending_events.keys()) 744 await self._async_update_item(model_item) 745 746 await self._expire_cache()
333class EventMediaManager: 334 """Responsible for handling recent events and fetching associated media.""" 335 336 def __init__( 337 self, 338 device_id: str, 339 traits: dict[str, Any], 340 event_traits: set[str], 341 diagnostics: Diagnostics, 342 ) -> None: 343 """Initialize DeviceEventMediaManager.""" 344 self._device_id = device_id 345 self._traits = traits 346 self._event_traits = event_traits 347 self._cache_policy = CachePolicy() 348 self._callback: Callable[[EventMessage], Awaitable[None]] | None = None 349 self._support_fetch = ( 350 CameraClipPreviewTrait.NAME in traits 351 or CameraEventImageTrait.NAME in traits 352 ) 353 self._diagnostics = diagnostics 354 self._lock: asyncio.Lock | None = None 355 356 @property 357 def cache_policy(self) -> CachePolicy: 358 """Return the current CachePolicy.""" 359 return self._cache_policy 360 361 @cache_policy.setter 362 def cache_policy(self, value: CachePolicy) -> None: 363 """Update the CachePolicy.""" 364 self._cache_policy = value 365 366 async def _async_load(self) -> OrderedDict[str, EventMediaModelItem]: 367 """Load the device specific data from the store.""" 368 store_data = await self._cache_policy.store.async_load() 369 event_data: OrderedDict[str, EventMediaModelItem] = OrderedDict() 370 if store_data: 371 device_data = store_data.get(self._device_id, []) 372 for item_data in device_data: 373 try: 374 item = EventMediaModelItem.from_dict(item_data) 375 except Exception as err: 376 _LOGGER.debug("Failed to parse event item: %s", str(err)) 377 raise err 378 event_data[item.event_session_id] = item 379 return event_data 380 381 async def _async_update(self, event_data: dict[str, EventMediaModelItem]) -> None: 382 """Save the device specific model to the store.""" 383 # Event order is preserved so popping from the oldest entry works 384 device_data: list[dict[str, Any]] = [] 385 for item in event_data.values(): 386 device_data.append(item.to_dict()) 387 388 # Read data from the store and update information for this device 389 store_data = await self._cache_policy.store.async_load() 390 if not store_data: 391 store_data = {} 392 store_data[self._device_id] = device_data 393 await self._cache_policy.store.async_save(store_data) 394 395 async def _async_load_item( 396 self, event_session_id: str 397 ) -> EventMediaModelItem | None: 398 """Load the specific item from the store.""" 399 event_data = await self._async_load() 400 return event_data.get(event_session_id) 401 402 async def _async_update_item(self, item: EventMediaModelItem) -> None: 403 """Update the specific item in the store.""" 404 if not self._lock: 405 self._lock = asyncio.Lock() 406 async with self._lock: 407 event_data = await self._async_load() 408 event_data[item.event_session_id] = item 409 await self._async_update(event_data) 410 411 async def _expire_cache(self) -> None: 412 """Garbage collect any items from the cache.""" 413 if not self._lock: 414 self._lock = asyncio.Lock() 415 async with self._lock: 416 event_data = await self._async_load() 417 _LOGGER.debug("Checking cache size %s", len(event_data)) 418 if len(event_data) <= self._cache_policy.event_cache_size: 419 return 420 _LOGGER.debug( 421 "Expiring cache %s", self._cache_policy.event_cache_expire_count 422 ) 423 # Bulk pop items 424 for i in range(0, self._cache_policy.event_cache_expire_count): 425 (key, old_item) = event_data.popitem(last=False) 426 _LOGGER.debug( 427 "Expiring media %s (%s)", 428 old_item.all_media_keys, 429 old_item.event_session_id, 430 ) 431 for media_key in old_item.all_media_keys: 432 await self._cache_policy.store.async_remove_media(media_key) 433 await self._async_update(event_data) 434 435 async def _fetch_media(self, item: EventMediaModelItem) -> None: 436 """Fetch media from the server in response to a pubsub event.""" 437 store = self._cache_policy.store 438 if CameraClipPreviewTrait.NAME in self._traits: 439 self._diagnostics.increment("fetch_clip") 440 if ( 441 item.media_key 442 or not item.visible_event 443 or not ( 444 clip_event := cast( 445 CameraClipPreviewEvent, 446 item.events.get(CameraClipPreviewEvent.NAME), 447 ) 448 ) 449 or clip_event.is_expired 450 ): 451 self._diagnostics.increment("fetch_clip.skip") 452 return 453 clip_preview_trait: CameraClipPreviewTrait = self._traits[ 454 CameraClipPreviewTrait.NAME 455 ] 456 event_image = await clip_preview_trait.generate_event_image( 457 clip_event.preview_url 458 ) 459 if event_image: 460 content = await event_image.contents() 461 # Caller will persist the media key assignment 462 media_key = store.get_clip_preview_media_key( 463 self._device_id, item.visible_event 464 ) 465 item.media_key = media_key 466 _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id) 467 self._diagnostics.increment("fetch_clip.save") 468 await store.async_save_media(media_key, content) 469 return 470 471 if CameraEventImageTrait.NAME not in self._traits: 472 return 473 self._diagnostics.increment("fetch_image") 474 475 event_image_trait: CameraEventImageTrait = self._traits[ 476 CameraEventImageTrait.NAME 477 ] 478 for event in item.events.values(): 479 if event.event_id in item.event_media_keys or event.is_expired: 480 self._diagnostics.increment("fetch_image.skip") 481 continue 482 event_image = await event_image_trait.generate_image(event.event_id) 483 content = await event_image.contents(width=SNAPSHOT_WIDTH_PX) 484 485 # Caller will persist the media key assignment 486 media_key = store.get_image_media_key(self._device_id, event) 487 item.event_media_keys[event.event_id] = media_key 488 _LOGGER.debug("Saving media %s (%s)", media_key, item.event_session_id) 489 self._diagnostics.increment("fetch_image.save") 490 await store.async_save_media(media_key, content) 491 492 async def get_media_from_token(self, event_token: str) -> Media | None: 493 """Get media based on the event token.""" 494 token = EventToken.decode(event_token) 495 if not (item := await self._async_load_item(token.event_session_id)): 496 self._diagnostics.increment("get_media.invalid_event") 497 _LOGGER.debug( 498 "No event information found for event id: %s", token.event_session_id 499 ) 500 return None 501 media_key = item.media_key_for_token(token) 502 if not media_key: 503 self._diagnostics.increment("get_media.no_media") 504 _LOGGER.debug("No persisted media for event id %s", token) 505 return None 506 contents = await self._cache_policy.store.async_load_media(media_key) 507 if not contents: 508 self._diagnostics.increment("get_media.empty") 509 _LOGGER.debug( 510 "Unable to load persisted media for event id: (%s, %s, %s)", 511 token.event_session_id, 512 token.event_id, 513 item.media_key, 514 ) 515 return None 516 assert item.visible_event 517 self._diagnostics.increment("get_media.success") 518 return Media(contents, item.visible_event.event_image_type) 519 520 async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None: 521 """Get a thumbnail from the event token.""" 522 self._diagnostics.increment("get_clip") 523 token = EventToken.decode(event_token) 524 if ( 525 not (item := await self._async_load_item(token.event_session_id)) 526 or not item.visible_event 527 ): 528 self._diagnostics.increment("get_clip.invalid_event") 529 _LOGGER.debug( 530 "No event information found for event id: %s", token.event_session_id 531 ) 532 return None 533 534 if item.thumbnail_media_key: 535 # Load cached thumbnail 536 contents = await self._cache_policy.store.async_load_media( 537 item.thumbnail_media_key 538 ) 539 if contents: 540 self._diagnostics.increment("get_clip.cached") 541 return Media(contents, EventImageType.IMAGE_PREVIEW) 542 _LOGGER.debug( 543 "Thumbnail %s does not exist; transcoding", item.thumbnail_media_key 544 ) 545 546 # Check for existing primary media 547 media_key = item.media_key_for_token(token) 548 if not media_key: 549 self._diagnostics.increment("get_clip.no_media") 550 _LOGGER.debug("No persisted media for event id %s", token) 551 return None 552 553 thumbnail_media_key = ( 554 self._cache_policy.store.get_clip_preview_thumbnail_media_key( 555 self._device_id, item.visible_event 556 ) 557 ) 558 if not self._cache_policy.transcoder or not thumbnail_media_key: 559 self._diagnostics.increment("get_clip.no_transcoding") 560 _LOGGER.debug("Clip transcoding disabled") 561 return None 562 563 try: 564 await self._cache_policy.transcoder.transcode_clip( 565 media_key, thumbnail_media_key 566 ) 567 except TranscodeException as err: 568 self._diagnostics.increment("get_clip.transcode_error") 569 _LOGGER.debug("Failure to transcode clip thumbnail: %s", str(err)) 570 return None 571 572 contents = await self._cache_policy.store.async_load_media(thumbnail_media_key) 573 if not contents: 574 self._diagnostics.increment("get_clip.load_error") 575 _LOGGER.debug( 576 "Failed to load transcoded clip: %s", item.thumbnail_media_key 577 ) 578 return None 579 580 item.thumbnail_media_key = thumbnail_media_key 581 await self._async_update_item(item) 582 583 self._diagnostics.increment("get_clip.success") 584 return Media(contents, EventImageType.IMAGE_PREVIEW) 585 586 async def async_image_sessions(self) -> list[ImageSession]: 587 """Return revent events.""" 588 self._diagnostics.increment("load_image_sessions") 589 590 def _get_events(x: EventMediaModelItem) -> list[ImageEventBase]: 591 # Only return events that have successful media fetches 592 return [ 593 y 594 for y in x.events.values() 595 if x.media_key or y.event_id in x.event_media_keys 596 ] 597 598 result = await self._items_with_media() 599 events_list = list(map(_get_events, result)) 600 events: Iterable[ImageEventBase] = itertools.chain(*events_list) 601 602 def _get_session(x: ImageEventBase) -> ImageSession: 603 return ImageSession(x.event_token, x.timestamp, x.event_type) 604 605 event_result = list(map(_get_session, events)) 606 event_result.sort(key=lambda x: x.timestamp, reverse=True) 607 return event_result 608 609 async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]: 610 """Return revent events for a device that supports clips.""" 611 self._diagnostics.increment("load_clip_previews") 612 613 def _event_visible(x: ImageEventBase) -> bool: 614 return x.event_type in VISIBLE_EVENTS 615 616 def _get_event_session(x: EventMediaModelItem) -> ClipPreviewSession | None: 617 assert x.visible_event 618 events = list(filter(_event_visible, x.events.values())) 619 events.sort(key=lambda x: x.timestamp) 620 if not events: 621 _LOGGER.debug("Partial event in storage") 622 return None 623 visible_event = events[0] 624 return ClipPreviewSession( 625 visible_event.event_token, 626 visible_event.timestamp, 627 [y.event_type for y in events], 628 ) 629 630 result = await self._items_with_media() 631 clips: Iterable[ClipPreviewSession | None] = iter( 632 map(_get_event_session, result) 633 ) 634 valid_clips: list[ClipPreviewSession] = [x for x in clips if x is not None] 635 valid_clips.sort(key=lambda x: x.timestamp, reverse=True) 636 return valid_clips 637 638 async def _items_with_media(self) -> list[EventMediaModelItem]: 639 """Return items in the model that have media for serving.""" 640 641 def _filter(x: EventMediaModelItem) -> bool: 642 """Return events already fetched or that could be fetched.""" 643 if x.media_key or x.event_media_keys: 644 return True 645 return False 646 647 event_data = await self._async_load() 648 return list(filter(_filter, event_data.values())) 649 650 def set_update_callback( 651 self, target: Callable[[EventMessage], Awaitable[None]] 652 ) -> None: 653 """Register a callback invoked when new messages are received.""" 654 self._callback = target 655 656 async def async_handle_events(self, event_message: EventMessage) -> None: 657 """Handle the EventMessage.""" 658 self._diagnostics.increment("event") 659 event_sessions: dict[str, dict[str, ImageEventBase]] | None = ( 660 event_message.event_sessions 661 ) 662 if not event_sessions: 663 return 664 _LOGGER.debug("Event Update %s", event_sessions.keys()) 665 recv_latency_ms = int((time.time() - event_message.timestamp.timestamp()) * 100) 666 667 # Notify traits to cache most recent event 668 pairs = list(event_sessions.items()) 669 for event_session_id, event_dict in pairs: 670 supported = False 671 for event_name, event in event_dict.items(): 672 if not event.is_expired: 673 self._diagnostics.elapsed(event_name, recv_latency_ms) 674 else: 675 self._diagnostics.elapsed(f"{event_name}_expired", recv_latency_ms) 676 if event_name not in self._event_traits: 677 self._diagnostics.increment(f"event.unsupported.{event_name}") 678 _LOGGER.debug("Unsupported event trait: %s", event_name) 679 continue 680 supported = True 681 682 # Skip any entirely unsupported events 683 if not supported: 684 del event_sessions[event_session_id] 685 686 model_items = [] 687 failure = False 688 for event_session_id, event_dict in event_sessions.items(): 689 # Track all related events together with the same session 690 if model_item := await self._async_load_item(event_session_id): 691 self._diagnostics.increment("event.update") 692 model_item.merge_events(event_dict) 693 else: 694 self._diagnostics.increment("event.new") 695 # A new event session 696 model_item = EventMediaModelItem( 697 event_session_id=event_session_id, 698 events=event_dict, 699 media_key=None, 700 event_media_keys={}, 701 thumbnail_media_key=None, 702 pending_event_keys=set(event_dict.keys()), 703 ) 704 model_items.append(model_item) 705 706 if self._support_fetch and self._cache_policy.fetch: 707 self._diagnostics.increment("event.fetch") 708 try: 709 await self._fetch_media(model_item) 710 except GoogleNestException as err: 711 self._diagnostics.increment("event.fetch_error") 712 failure = True 713 _LOGGER.warning( 714 "Failure when pre-fetching event '%s': %s", 715 event.event_session_id, 716 str(err), 717 ) 718 719 # Send notifications for any undelivered events that have media. 720 pending_events: dict[str, ImageEventBase] = {} 721 for model_item in model_items: 722 if ( 723 model_item.any_media_key is None 724 and self._support_fetch 725 and self._cache_policy.fetch 726 and not event_message.is_thread_ended 727 and not failure 728 ): 729 continue 730 pending_events.update(model_item.pending_events) 731 732 if pending_events: 733 _LOGGER.debug("Message contains notifiable events: %s", pending_events) 734 event_message = event_message.with_events( 735 pending_events.keys(), pending_events 736 ) 737 if self._callback: 738 self._diagnostics.increment("event.notify") 739 await self._callback(event_message) 740 else: 741 _LOGGER.debug("Message did not contain notifiable events") 742 743 for model_item in model_items: 744 model_item.notified(pending_events.keys()) 745 await self._async_update_item(model_item) 746 747 await self._expire_cache()
Responsible for handling recent events and fetching associated media.
336 def __init__( 337 self, 338 device_id: str, 339 traits: dict[str, Any], 340 event_traits: set[str], 341 diagnostics: Diagnostics, 342 ) -> None: 343 """Initialize DeviceEventMediaManager.""" 344 self._device_id = device_id 345 self._traits = traits 346 self._event_traits = event_traits 347 self._cache_policy = CachePolicy() 348 self._callback: Callable[[EventMessage], Awaitable[None]] | None = None 349 self._support_fetch = ( 350 CameraClipPreviewTrait.NAME in traits 351 or CameraEventImageTrait.NAME in traits 352 ) 353 self._diagnostics = diagnostics 354 self._lock: asyncio.Lock | None = None
Initialize DeviceEventMediaManager.
356 @property 357 def cache_policy(self) -> CachePolicy: 358 """Return the current CachePolicy.""" 359 return self._cache_policy
Return the current CachePolicy.
492 async def get_media_from_token(self, event_token: str) -> Media | None: 493 """Get media based on the event token.""" 494 token = EventToken.decode(event_token) 495 if not (item := await self._async_load_item(token.event_session_id)): 496 self._diagnostics.increment("get_media.invalid_event") 497 _LOGGER.debug( 498 "No event information found for event id: %s", token.event_session_id 499 ) 500 return None 501 media_key = item.media_key_for_token(token) 502 if not media_key: 503 self._diagnostics.increment("get_media.no_media") 504 _LOGGER.debug("No persisted media for event id %s", token) 505 return None 506 contents = await self._cache_policy.store.async_load_media(media_key) 507 if not contents: 508 self._diagnostics.increment("get_media.empty") 509 _LOGGER.debug( 510 "Unable to load persisted media for event id: (%s, %s, %s)", 511 token.event_session_id, 512 token.event_id, 513 item.media_key, 514 ) 515 return None 516 assert item.visible_event 517 self._diagnostics.increment("get_media.success") 518 return Media(contents, item.visible_event.event_image_type)
Get media based on the event token.
520 async def get_clip_thumbnail_from_token(self, event_token: str) -> Media | None: 521 """Get a thumbnail from the event token.""" 522 self._diagnostics.increment("get_clip") 523 token = EventToken.decode(event_token) 524 if ( 525 not (item := await self._async_load_item(token.event_session_id)) 526 or not item.visible_event 527 ): 528 self._diagnostics.increment("get_clip.invalid_event") 529 _LOGGER.debug( 530 "No event information found for event id: %s", token.event_session_id 531 ) 532 return None 533 534 if item.thumbnail_media_key: 535 # Load cached thumbnail 536 contents = await self._cache_policy.store.async_load_media( 537 item.thumbnail_media_key 538 ) 539 if contents: 540 self._diagnostics.increment("get_clip.cached") 541 return Media(contents, EventImageType.IMAGE_PREVIEW) 542 _LOGGER.debug( 543 "Thumbnail %s does not exist; transcoding", item.thumbnail_media_key 544 ) 545 546 # Check for existing primary media 547 media_key = item.media_key_for_token(token) 548 if not media_key: 549 self._diagnostics.increment("get_clip.no_media") 550 _LOGGER.debug("No persisted media for event id %s", token) 551 return None 552 553 thumbnail_media_key = ( 554 self._cache_policy.store.get_clip_preview_thumbnail_media_key( 555 self._device_id, item.visible_event 556 ) 557 ) 558 if not self._cache_policy.transcoder or not thumbnail_media_key: 559 self._diagnostics.increment("get_clip.no_transcoding") 560 _LOGGER.debug("Clip transcoding disabled") 561 return None 562 563 try: 564 await self._cache_policy.transcoder.transcode_clip( 565 media_key, thumbnail_media_key 566 ) 567 except TranscodeException as err: 568 self._diagnostics.increment("get_clip.transcode_error") 569 _LOGGER.debug("Failure to transcode clip thumbnail: %s", str(err)) 570 return None 571 572 contents = await self._cache_policy.store.async_load_media(thumbnail_media_key) 573 if not contents: 574 self._diagnostics.increment("get_clip.load_error") 575 _LOGGER.debug( 576 "Failed to load transcoded clip: %s", item.thumbnail_media_key 577 ) 578 return None 579 580 item.thumbnail_media_key = thumbnail_media_key 581 await self._async_update_item(item) 582 583 self._diagnostics.increment("get_clip.success") 584 return Media(contents, EventImageType.IMAGE_PREVIEW)
Get a thumbnail from the event token.
586 async def async_image_sessions(self) -> list[ImageSession]: 587 """Return revent events.""" 588 self._diagnostics.increment("load_image_sessions") 589 590 def _get_events(x: EventMediaModelItem) -> list[ImageEventBase]: 591 # Only return events that have successful media fetches 592 return [ 593 y 594 for y in x.events.values() 595 if x.media_key or y.event_id in x.event_media_keys 596 ] 597 598 result = await self._items_with_media() 599 events_list = list(map(_get_events, result)) 600 events: Iterable[ImageEventBase] = itertools.chain(*events_list) 601 602 def _get_session(x: ImageEventBase) -> ImageSession: 603 return ImageSession(x.event_token, x.timestamp, x.event_type) 604 605 event_result = list(map(_get_session, events)) 606 event_result.sort(key=lambda x: x.timestamp, reverse=True) 607 return event_result
Return revent events.
609 async def async_clip_preview_sessions(self) -> list[ClipPreviewSession]: 610 """Return revent events for a device that supports clips.""" 611 self._diagnostics.increment("load_clip_previews") 612 613 def _event_visible(x: ImageEventBase) -> bool: 614 return x.event_type in VISIBLE_EVENTS 615 616 def _get_event_session(x: EventMediaModelItem) -> ClipPreviewSession | None: 617 assert x.visible_event 618 events = list(filter(_event_visible, x.events.values())) 619 events.sort(key=lambda x: x.timestamp) 620 if not events: 621 _LOGGER.debug("Partial event in storage") 622 return None 623 visible_event = events[0] 624 return ClipPreviewSession( 625 visible_event.event_token, 626 visible_event.timestamp, 627 [y.event_type for y in events], 628 ) 629 630 result = await self._items_with_media() 631 clips: Iterable[ClipPreviewSession | None] = iter( 632 map(_get_event_session, result) 633 ) 634 valid_clips: list[ClipPreviewSession] = [x for x in clips if x is not None] 635 valid_clips.sort(key=lambda x: x.timestamp, reverse=True) 636 return valid_clips
Return revent events for a device that supports clips.
650 def set_update_callback( 651 self, target: Callable[[EventMessage], Awaitable[None]] 652 ) -> None: 653 """Register a callback invoked when new messages are received.""" 654 self._callback = target
Register a callback invoked when new messages are received.
656 async def async_handle_events(self, event_message: EventMessage) -> None: 657 """Handle the EventMessage.""" 658 self._diagnostics.increment("event") 659 event_sessions: dict[str, dict[str, ImageEventBase]] | None = ( 660 event_message.event_sessions 661 ) 662 if not event_sessions: 663 return 664 _LOGGER.debug("Event Update %s", event_sessions.keys()) 665 recv_latency_ms = int((time.time() - event_message.timestamp.timestamp()) * 100) 666 667 # Notify traits to cache most recent event 668 pairs = list(event_sessions.items()) 669 for event_session_id, event_dict in pairs: 670 supported = False 671 for event_name, event in event_dict.items(): 672 if not event.is_expired: 673 self._diagnostics.elapsed(event_name, recv_latency_ms) 674 else: 675 self._diagnostics.elapsed(f"{event_name}_expired", recv_latency_ms) 676 if event_name not in self._event_traits: 677 self._diagnostics.increment(f"event.unsupported.{event_name}") 678 _LOGGER.debug("Unsupported event trait: %s", event_name) 679 continue 680 supported = True 681 682 # Skip any entirely unsupported events 683 if not supported: 684 del event_sessions[event_session_id] 685 686 model_items = [] 687 failure = False 688 for event_session_id, event_dict in event_sessions.items(): 689 # Track all related events together with the same session 690 if model_item := await self._async_load_item(event_session_id): 691 self._diagnostics.increment("event.update") 692 model_item.merge_events(event_dict) 693 else: 694 self._diagnostics.increment("event.new") 695 # A new event session 696 model_item = EventMediaModelItem( 697 event_session_id=event_session_id, 698 events=event_dict, 699 media_key=None, 700 event_media_keys={}, 701 thumbnail_media_key=None, 702 pending_event_keys=set(event_dict.keys()), 703 ) 704 model_items.append(model_item) 705 706 if self._support_fetch and self._cache_policy.fetch: 707 self._diagnostics.increment("event.fetch") 708 try: 709 await self._fetch_media(model_item) 710 except GoogleNestException as err: 711 self._diagnostics.increment("event.fetch_error") 712 failure = True 713 _LOGGER.warning( 714 "Failure when pre-fetching event '%s': %s", 715 event.event_session_id, 716 str(err), 717 ) 718 719 # Send notifications for any undelivered events that have media. 720 pending_events: dict[str, ImageEventBase] = {} 721 for model_item in model_items: 722 if ( 723 model_item.any_media_key is None 724 and self._support_fetch 725 and self._cache_policy.fetch 726 and not event_message.is_thread_ended 727 and not failure 728 ): 729 continue 730 pending_events.update(model_item.pending_events) 731 732 if pending_events: 733 _LOGGER.debug("Message contains notifiable events: %s", pending_events) 734 event_message = event_message.with_events( 735 pending_events.keys(), pending_events 736 ) 737 if self._callback: 738 self._diagnostics.increment("event.notify") 739 await self._callback(event_message) 740 else: 741 _LOGGER.debug("Message did not contain notifiable events") 742 743 for model_item in model_items: 744 model_item.notified(pending_events.keys()) 745 await self._async_update_item(model_item) 746 747 await self._expire_cache()
Handle the EventMessage.
95@dataclass 96class ImageSession: 97 """An object that holds an image based event.""" 98 99 event_token: str 100 """A token that can be used to fetch the media for the event.""" 101 102 timestamp: datetime.datetime 103 """Timestamp when the event happened.""" 104 105 event_type: str 106 """A label for the type of event."""
An object that holds an image based event.
109@dataclass 110class ClipPreviewSession: 111 """An object that holds a clip based event.""" 112 113 event_token: str 114 """A token that can be used to fetch the media for the event.""" 115 116 timestamp: datetime.datetime 117 """Timestamp when the event happened.""" 118 119 event_types: list[str] 120 """A label for the type of event."""
An object that holds a clip based event.
79@dataclass 80class Media: 81 """Represents media related to an event.""" 82 83 contents: bytes 84 """Media content.""" 85 86 event_image_type: EventImageContentType 87 """Content event image type of the media.""" 88 89 @property 90 def content_type(self) -> str: 91 """Content type of the media.""" 92 return self.event_image_type.content_type
Represents media related to an event.
123class EventMediaStore(ABC): 124 """Interface for external storage.""" 125 126 @abstractmethod 127 async def async_load(self) -> dict | None: 128 """Load data.""" 129 130 @abstractmethod 131 async def async_save(self, data: dict) -> None: 132 """Save data.""" 133 134 @abstractmethod 135 def get_media_key(self, device_id: str, event: ImageEventBase) -> str: 136 """Return the filename to use for the device and event.""" 137 138 def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str: 139 """Return the filename for image media.""" 140 return self.get_media_key(device_id, event) 141 142 def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str: 143 """Return the filename for clip preview media.""" 144 return self.get_media_key(device_id, event) 145 146 def get_clip_preview_thumbnail_media_key( 147 self, device_id: str, event: ImageEventBase 148 ) -> str | None: 149 """Return the filename for thumbnail for clip preview media.""" 150 return None 151 152 async def async_load_media(self, media_key: str) -> bytes | None: 153 """Load media content.""" 154 155 async def async_save_media(self, media_key: str, content: bytes) -> None: 156 """Write media content.""" 157 158 async def async_remove_media(self, media_key: str) -> None: 159 """Remove media content."""
Interface for external storage.
134 @abstractmethod 135 def get_media_key(self, device_id: str, event: ImageEventBase) -> str: 136 """Return the filename to use for the device and event."""
Return the filename to use for the device and event.
138 def get_image_media_key(self, device_id: str, event: ImageEventBase) -> str: 139 """Return the filename for image media.""" 140 return self.get_media_key(device_id, event)
Return the filename for image media.
142 def get_clip_preview_media_key(self, device_id: str, event: ImageEventBase) -> str: 143 """Return the filename for clip preview media.""" 144 return self.get_media_key(device_id, event)
Return the filename for clip preview media.
146 def get_clip_preview_thumbnail_media_key( 147 self, device_id: str, event: ImageEventBase 148 ) -> str | None: 149 """Return the filename for thumbnail for clip preview media.""" 150 return None
Return the filename for thumbnail for clip preview media.
216@dataclass 217class CachePolicy: 218 """Policy for how many local objects to cache in memory.""" 219 220 event_cache_size: int = DEFAULT_CACHE_SIZE 221 """Number of events to keep in memory per device.""" 222 223 fetch: bool = False 224 """Determine if event media should be pre-fetched.""" 225 226 store: EventMediaStore = field(default_factory=InMemoryEventMediaStore) 227 """The EventMediaStore object for storing media content.""" 228 229 transcoder: Transcoder | None = None 230 """The transcoder for encoding media.""" 231 232 @property 233 def event_cache_expire_count(self) -> int: 234 """Number of events to keep in memory per device.""" 235 return max(int(self.event_cache_size * EXPIRE_CACHE_BATCH_SIZE), 1)
Policy for how many local objects to cache in memory.