Source code for gntplib.lib

#!/usr/bin/env python3

# File: gntplib/lib.py
# Author: Hadi Cahyadi <cumulus13@gmail.com>
# Date: 2025-12-25
# Description: Main library implementation for GNTP protocol.
# License: MIT

"""Main library implementation for GNTP protocol.

This module provides the high-level Publisher and Subscriber classes
for sending and receiving GNTP notifications.
"""

from typing import List, Optional, Union, Callable, Any, Tuple

from .models import Event, Notification, Resource, SocketCallback, URLCallback
from .requests import RegisterRequest, NotifyRequest, SubscribeRequest, Response
from .connections import GNTPClient
from .constants import DEFAULT_PORT, DEFAULT_TTL
from .exceptions import GNTPError, GNTPValidationError

__all__ = [
    'publish',
    'subscribe',
    'Publisher',
    'Subscriber',
    # Deprecated aliases
    'notify',
    'Notifier'
]


[docs] def publish( app_name: str, event_name: str, title: str, text: str = '' ) -> None: """Quick publish: Register and send notification in one call. This is a convenience function that registers an application with a single event type and immediately sends a notification. Args: app_name: Name of the application event_name: Name of the notification event title: Notification title text: Notification message (default: '') Example: >>> from gntplib import publish >>> publish('MyApp', 'Alert', 'Warning', 'System overheating') """ publisher = Publisher(app_name, [Event(event_name)]) publisher.register() publisher.publish(event_name, title, text)
def notify(app_name: str, event_name: str, title: str, text: str = '') -> None: """Deprecated: Use publish() instead.""" import warnings warnings.warn( 'notify() is deprecated, use publish() instead', DeprecationWarning, stacklevel=2 ) publish(app_name, event_name, title, text)
[docs] def subscribe( id_: str, name: str, hub: Union[str, Tuple[str, int]], password: str, port: int = DEFAULT_PORT ) -> int: """Quick subscribe: Send subscription request and return TTL. Args: id_: Unique subscriber ID name: Subscriber name hub: Hub address (hostname or (host, port) tuple) password: Hub password port: Subscriber port (default: 23053) Returns: Subscription TTL in seconds Example: >>> from gntplib import subscribe >>> ttl = subscribe('sub-123', 'MySubscriber', 'hub.example.com', 'secret') """ subscriber = Subscriber(id_, name, hub, password, port=port) subscriber.subscribe() return subscriber.ttl
[docs] def coerce_to_events(items: List[Union[str, Tuple[str, bool], Event]]) -> List[Event]: """Convert various event definitions to Event instances. Args: items: List of event definitions - str: Event name (enabled by default) - (str, bool): (Event name, enabled flag) - Event: Event instance (used as-is) Returns: List of Event instances Example: >>> events = coerce_to_events(['event1', ('event2', False)]) """ results = [] for item in items: if isinstance(item, str): results.append(Event(item, enabled=True)) elif isinstance(item, tuple): name, enabled = item results.append(Event(name, enabled=enabled)) elif isinstance(item, Event): results.append(item) else: raise GNTPValidationError( f"Invalid event definition: {item}. " f"Expected str, (str, bool), or Event" ) return results
[docs] def coerce_to_callback( gntp_callback: Optional[Union[str, SocketCallback]] = None, **socket_callback_options ) -> Optional[Union[URLCallback, SocketCallback]]: """Convert callback specification to callback instance. Args: gntp_callback: URL string or SocketCallback instance **socket_callback_options: Options for creating SocketCallback Returns: URLCallback, SocketCallback, or None Raises: GNTPError: If both gntp_callback and options are provided """ if gntp_callback is not None: if socket_callback_options: raise GNTPError( "Cannot specify both gntp_callback and socket_callback_options" ) if isinstance(gntp_callback, str): return URLCallback(gntp_callback) else: return gntp_callback if socket_callback_options: return SocketCallback(**socket_callback_options) return None
class BaseApp: """Base class for GNTP applications. Provides common functionality for Publisher and Subscriber. Attributes: custom_headers: List of (key, value) for custom X- headers app_specific_headers: List of (key, value) for Data- headers gntp_client: GNTP client instance """ def __init__( self, custom_headers: Optional[List[Tuple[str, Any]]] = None, app_specific_headers: Optional[List[Tuple[str, Any]]] = None, gntp_client_class: Optional[type] = None, **kwargs ): """Initialize base app. Args: custom_headers: Custom X- headers app_specific_headers: App Data- headers gntp_client_class: Client class (default: GNTPClient) **kwargs: Additional client arguments """ self.custom_headers = custom_headers or [] self.app_specific_headers = app_specific_headers or [] if gntp_client_class is None: gntp_client_class = GNTPClient self.gntp_client = gntp_client_class(**kwargs)
[docs] class Publisher(BaseApp): """GNTP notification publisher. Registers an application and sends notifications to GNTP servers. Attributes: name: Application name icon: Optional application icon events: List of notification event definitions """
[docs] def __init__( self, name: str, event_defs: List[Union[str, Tuple[str, bool], Event]], icon: Optional[Union[str, Resource]] = None, custom_headers: Optional[List[Tuple[str, Any]]] = None, app_specific_headers: Optional[List[Tuple[str, Any]]] = None, gntp_client_class: Optional[type] = None, **kwargs ): """Initialize publisher. Args: name: Application name event_defs: Event definitions (str, tuple, or Event instances) icon: Optional app icon (URL or Resource) custom_headers: Custom X- headers app_specific_headers: App Data- headers gntp_client_class: Client class (default: GNTPClient) **kwargs: Additional client arguments Raises: GNTPValidationError: If no events defined Example: >>> from gntplib import Publisher, Event, Resource >>> events = [Event('update', 'Update Available')] >>> icon = Resource.from_file('icon.png') >>> pub = Publisher('MyApp', events, icon=icon) """ self.name = name self.icon = self._coerce_to_resource(icon) self.events = coerce_to_events(event_defs) if not self.events: raise GNTPValidationError( "At least one notification event type must be defined" ) super().__init__( custom_headers, app_specific_headers, gntp_client_class, **kwargs )
[docs] def register(self, callback: Optional[Callable[[Response], None]] = None) -> None: """Register this publisher with the GNTP server. Must be called before publishing notifications. Args: callback: Optional callback called with server response Example: >>> pub = Publisher('MyApp', [Event('test')]) >>> pub.register() """ request = RegisterRequest( self.name, self.icon, self.events, self.custom_headers, self.app_specific_headers ) self.gntp_client.process_request(request, callback)
[docs] def publish( self, name: str, title: str, text: str = '', id_: Optional[str] = None, sticky: bool = False, priority: int = 0, icon: Optional[Union[str, Resource]] = None, coalescing_id: Optional[str] = None, callback: Optional[Callable] = None, gntp_callback: Optional[Union[str, SocketCallback]] = None, **socket_callback_options ) -> None: """Send a notification. Args: name: Event name (must be registered) title: Notification title text: Notification message (default: '') id_: Unique notification ID sticky: Keep until dismissed (default: False) priority: Priority -2 to 2 (default: 0) icon: Optional icon (URL or Resource) coalescing_id: ID for grouping notifications callback: Completion callback gntp_callback: URL or SocketCallback for notification events **socket_callback_options: Options for SocketCallback Example: >>> pub.publish( ... 'update', ... 'New Version', ... 'Version 2.0 is available', ... priority=1, ... sticky=True ... ) """ notification = Notification( name, title, text, id_=id_, sticky=sticky, priority=priority, icon=self._coerce_to_resource(icon), coalescing_id=coalescing_id, callback=coerce_to_callback(gntp_callback, **socket_callback_options) ) request = NotifyRequest( self.name, notification, self.custom_headers, self.app_specific_headers ) self.gntp_client.process_request( request, callback, socket_callback=notification.socket_callback )
def _coerce_to_resource( self, value: Optional[Union[str, Resource]] ) -> Optional[Resource]: """Convert value to Resource if needed.""" if value is None or isinstance(value, Resource): return value if isinstance(value, str): return Resource(url=value) return value
[docs] def __repr__(self) -> str: """Return string representation.""" return f"Publisher(name={self.name!r}, events={len(self.events)})"
class Notifier(Publisher): """Deprecated: Use Publisher instead.""" def __init__(self, *args, **kwargs): import warnings warnings.warn( 'Notifier is deprecated, use Publisher instead', DeprecationWarning, stacklevel=2 ) super().__init__(*args, **kwargs) def notify(self, *args, **kwargs): """Deprecated: Use publish() instead.""" import warnings warnings.warn( 'notify() method is deprecated, use publish() instead', DeprecationWarning, stacklevel=2 ) self.publish(*args, **kwargs)
[docs] class Subscriber(BaseApp): """GNTP notification subscriber. Subscribes to receive notifications from a GNTP hub. Attributes: id_: Unique subscriber ID name: Subscriber name hub: Hub (host, port) tuple password: Hub password port: Subscriber port ttl: Subscription time-to-live """
[docs] def __init__( self, id_: str, name: str, hub: Union[str, Tuple[str, int]], password: str, port: int = DEFAULT_PORT, custom_headers: Optional[List[Tuple[str, Any]]] = None, app_specific_headers: Optional[List[Tuple[str, Any]]] = None, gntp_client_class: Optional[type] = None, **kwargs ): """Initialize subscriber. Args: id_: Unique subscriber ID name: Subscriber name hub: Hub address (hostname or (host, port) tuple) password: Hub password port: Subscriber port (default: 23053) custom_headers: Custom X- headers app_specific_headers: App Data- headers gntp_client_class: Client class (default: GNTPClient) **kwargs: Additional client arguments Example: >>> sub = Subscriber( ... 'sub-123', ... 'MySubscriber', ... 'hub.example.com', ... 'secret' ... ) """ self.id_ = id_ self.name = name # Parse hub address if isinstance(hub, str): self.hub = (hub, DEFAULT_PORT) else: self.hub = hub self.password = password self.port = port self.ttl = DEFAULT_TTL # Initialize with hub connection settings super().__init__( custom_headers, app_specific_headers, gntp_client_class, host=self.hub[0], port=self.hub[1], password=self.password, **kwargs )
[docs] def subscribe( self, callback: Optional[Callable[[Response], None]] = None ) -> None: """Send subscription request to hub. Updates ttl attribute with value from server response. Args: callback: Optional callback (default: updates ttl) Example: >>> sub.subscribe() >>> print(f"Subscription TTL: {sub.ttl} seconds") """ request = SubscribeRequest( self.id_, self.name, self.port, self.custom_headers, self.app_specific_headers ) self.gntp_client.process_request( request, callback or self.store_ttl )
[docs] def store_ttl(self, response: Response) -> None: """Store TTL from subscription response. Args: response: Response from SUBSCRIBE request """ ttl_str = response.headers.get('Subscription-TTL', str(DEFAULT_TTL)) try: self.ttl = int(ttl_str) except ValueError: self.ttl = DEFAULT_TTL
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"Subscriber(id={self.id_!r}, name={self.name!r}, " f"hub={self.hub[0]}:{self.hub[1]})" )