Source code for gntplib.async_gntp

#!/usr/bin/env python3

# File: gntplib/async_gntp.py
# Author: Hadi Cahyadi <cumulus13@gmail.com>
# Date: 2025-12-25
# Description: Asynchronous GNTP implementation using Tornado
# License: MIT

"""Asynchronous GNTP implementation using Tornado.

This module provides async/await based GNTP communication using the
Tornado framework. Requires Python 3.7+ and Tornado 5.0+.

Example:
    >>> import asyncio
    >>> from gntplib.async_gntp import AsyncPublisher, AsyncResource
    >>> from gntplib import Event
    >>> 
    >>> async def main():
    ...     icon = AsyncResource('https://example.com/icon.png')
    ...     pub = AsyncPublisher('MyApp', [Event('test')], icon=icon)
    ...     pub.register()
    ...     pub.publish('test', 'Hello', 'World')
    >>> 
    >>> asyncio.run(main())
"""

import logging
import socket
import asyncio
from typing import List, Optional, Tuple, Any, Union

try:
    from tornado import httpclient, ioloop, iostream
    TORNADO_AVAILABLE = True
except ImportError:
    TORNADO_AVAILABLE = False
    httpclient = ioloop = iostream = None

from .lib import Publisher as SyncPublisher, Subscriber as SyncSubscriber
from .models import Resource, Event
from .connections import BaseGNTPConnection, GNTPClient
from .requests import RegisterRequest, NotifyRequest
from .constants import MESSAGE_DELIMITER, DEFAULT_PORT
from .exceptions import GNTPError

__all__ = [
    'AsyncPublisher',
    'AsyncSubscriber',
    'AsyncGNTPConnection',
    'AsyncGNTPClient',
    'AsyncResource',
    'fetch_async_resources_in_parallel',
    'collect_async_resources'
]

logger = logging.getLogger(__name__)


def _check_tornado():
    """Check if Tornado is available."""
    if not TORNADO_AVAILABLE:
        raise ImportError(
            "Tornado is required for async functionality. "
            "Install with: pip install tornado"
        )


[docs] class AsyncPublisher(SyncPublisher): """Asynchronous GNTP notification publisher. Same as Publisher but uses AsyncGNTPClient for non-blocking operations. Requires Tornado. Example: >>> import asyncio >>> from gntplib.async_gntp import AsyncPublisher >>> from gntplib import Event >>> >>> async def main(): ... pub = AsyncPublisher('MyApp', [Event('test')]) ... pub.register() ... pub.publish('test', 'Title', 'Message') >>> >>> asyncio.run(main()) """
[docs] def __init__( self, name: str, event_defs: List[Union[str, Tuple[str, bool], Event]], icon: Optional[Any] = None, io_loop: Optional['ioloop.IOLoop'] = None, **kwargs ): """Initialize async publisher. Args: name: Application name event_defs: Event definitions icon: Optional app icon (can be AsyncResource) io_loop: Tornado IOLoop (default: current loop) **kwargs: Additional client arguments Raises: ImportError: If Tornado is not installed """ _check_tornado() io_loop = io_loop or ioloop.IOLoop.current() super().__init__( name, event_defs, icon, gntp_client_class=AsyncGNTPClient, io_loop=io_loop, **kwargs )
[docs] class AsyncSubscriber(SyncSubscriber): """Asynchronous GNTP notification subscriber. Same as Subscriber but uses AsyncGNTPClient for non-blocking operations. Requires Tornado. Example: >>> import asyncio >>> from gntplib.async_gntp import AsyncSubscriber >>> >>> async def main(): ... sub = AsyncSubscriber('id', 'name', 'hub.example.com', 'pass') ... sub.subscribe() >>> >>> asyncio.run(main()) """
[docs] def __init__( self, id_: str, name: str, hub: Union[str, Tuple[str, int]], password: str, port: int = DEFAULT_PORT, io_loop: Optional['ioloop.IOLoop'] = None, **kwargs ): """Initialize async subscriber. Args: id_: Unique subscriber ID name: Subscriber name hub: Hub address password: Hub password port: Subscriber port (default: 23053) io_loop: Tornado IOLoop (default: current loop) **kwargs: Additional client arguments Raises: ImportError: If Tornado is not installed """ _check_tornado() io_loop = io_loop or ioloop.IOLoop.current() super().__init__( id_, name, hub, password, port=port, gntp_client_class=AsyncGNTPClient, io_loop=io_loop, **kwargs )
[docs] class AsyncGNTPConnection(BaseGNTPConnection): """Asynchronous GNTP connection using Tornado IOStream. Provides non-blocking socket communication with GNTP server. Attributes: stream: Tornado IOStream for async socket operations """
[docs] def __init__( self, address: Tuple[str, int], timeout: float, final_callback: Any, socket_callback: Optional[Any] = None, io_loop: Optional['ioloop.IOLoop'] = None ): """Initialize async connection. Args: address: (host, port) tuple timeout: Connection timeout in seconds final_callback: Callback when operation completes socket_callback: Optional socket event callback io_loop: Tornado IOLoop (default: current loop) Raises: ImportError: If Tornado is not installed """ _check_tornado() super().__init__(final_callback, socket_callback) sock = socket.create_connection(address, timeout=timeout) self.stream = iostream.IOStream(sock, io_loop=io_loop)
[docs] def write_message(self, message: bytes) -> None: """Send message to server asynchronously. Args: message: Message bytes to send """ self.stream.write(message)
[docs] def read_message(self, callback: Any) -> None: """Read message from server asynchronously. Args: callback: Function to call with received message """ self.stream.read_until(MESSAGE_DELIMITER, callback)
[docs] def close(self) -> None: """Close the stream.""" if self.stream is not None: self.stream.close() self.stream = None
[docs] class AsyncGNTPClient(GNTPClient): """Asynchronous GNTP client using Tornado. Extends GNTPClient with async resource fetching capabilities. Attributes: io_loop: Tornado IOLoop instance """
[docs] def __init__( self, io_loop: Optional['ioloop.IOLoop'] = None, **kwargs ): """Initialize async client. Args: io_loop: Tornado IOLoop (default: current loop) **kwargs: Additional arguments for GNTPClient Raises: ImportError: If Tornado is not installed """ _check_tornado() super().__init__(connection_class=AsyncGNTPConnection, **kwargs) self.io_loop = io_loop or ioloop.IOLoop.current()
[docs] async def process_request( self, request: Any, callback: Any, **kwargs ) -> None: """Process request asynchronously. If request contains AsyncResource instances, fetches them before sending the request. Args: request: GNTP request to process callback: Callback when operation completes **kwargs: Additional connection arguments """ # Fetch async resources first async_resources = collect_async_resources(request) if async_resources: await fetch_async_resources_in_parallel(async_resources) # Process request normally super().process_request( request, callback, io_loop=self.io_loop, **kwargs )
[docs] class AsyncResource(Resource): """Resource that will be fetched asynchronously from URL. Use this class when you want to fetch icons or images from remote URLs asynchronously before sending the notification. Attributes: url: URL to fetch resource from data: Resource data (populated after fetch) """
[docs] def __init__(self, url: str): """Initialize async resource. Args: url: URL to fetch resource from Example: >>> icon = AsyncResource('https://example.com/icon.png') """ super().__init__(data=None) self.url = url
[docs] def __repr__(self) -> str: """Return string representation.""" status = f"{len(self.data)} bytes" if self.data else "not fetched" return f"AsyncResource(url={self.url!r}, {status})"
[docs] async def fetch_async_resources_in_parallel( async_resources: List[AsyncResource] ) -> List[AsyncResource]: """Fetch multiple AsyncResource URLs in parallel. Uses asyncio.gather to fetch all resources concurrently. Failed fetches are logged but don't stop the function. Args: async_resources: List of AsyncResource instances Returns: Same list with data populated Example: >>> resources = [ ... AsyncResource('https://example.com/icon1.png'), ... AsyncResource('https://example.com/icon2.png') ... ] >>> await fetch_async_resources_in_parallel(resources) """ _check_tornado() http_client = httpclient.AsyncHTTPClient() async def fetch_one(resource: AsyncResource) -> Tuple[AsyncResource, Any]: """Fetch a single resource with error handling.""" try: response = await http_client.fetch(resource.url) return resource, response except Exception as e: logger.warning(f"Failed to fetch {resource.url!r}: {e}") return resource, None # Fetch all resources concurrently fetch_tasks = [fetch_one(resource) for resource in async_resources] results = await asyncio.gather(*fetch_tasks, return_exceptions=False) # Process results for resource, response in results: if response is None: logger.warning(f"Failed to fetch {resource.url!r}") resource.data = None elif hasattr(response, 'error') and response.error: logger.warning(f"Failed to fetch {resource.url!r}: {response.error}") resource.data = None else: resource.data = response.body return async_resources
[docs] def collect_async_resources(request: Any) -> List[AsyncResource]: """Collect all AsyncResource instances from a request. Traverses the request object to find all AsyncResource instances that need to be fetched before the request can be sent. Args: request: GNTP request object Returns: List of unique AsyncResource instances Example: >>> from gntplib import Event >>> from gntplib.requests import RegisterRequest >>> icon = AsyncResource('https://example.com/icon.png') >>> request = RegisterRequest('MyApp', icon, [Event('test')]) >>> resources = collect_async_resources(request) """ resources = [] # Collect from different request types if isinstance(request, RegisterRequest): resources = [request.app_icon] + [e.icon for e in request.events] elif isinstance(request, NotifyRequest): resources = [request.notification.icon] # Collect from custom headers resources.extend([ value for _, value in request.custom_headers if isinstance(value, AsyncResource) ]) # Collect from app-specific headers resources.extend([ value for _, value in request.app_specific_headers if isinstance(value, AsyncResource) ]) # Return unique AsyncResource instances only unique_resources = list(set( r for r in resources if isinstance(r, AsyncResource) )) return unique_resources
# Deprecated alias class AsyncNotifier(AsyncPublisher): """Deprecated: Use AsyncPublisher instead.""" def __init__(self, *args, **kwargs): import warnings warnings.warn( 'AsyncNotifier is deprecated, use AsyncPublisher instead', DeprecationWarning, stacklevel=2 ) super().__init__(*args, **kwargs) # Deprecated alias class AsyncIcon(AsyncResource): """Deprecated: Use AsyncResource instead.""" def __init__(self, url: str): import warnings warnings.warn( 'AsyncIcon is deprecated, use AsyncResource instead', DeprecationWarning, stacklevel=2 ) super().__init__(url)