Source code for gntplib.connections

#!/usr/bin/env python3

# File: gntplib/connections.py
# Author: Hadi Cahyadi <cumulus13@gmail.com>
# Date: 2025-12-25
# Description: GNTP connection and client implementations.
# License: MIT

"""GNTP connection and client implementations.

This module provides synchronous connection handling and client logic
for communicating with GNTP servers.
"""

import socket
from typing import Optional, Callable, Any, Tuple, Generator

from .constants import (
    DEFAULT_PORT,
    DEFAULT_TIMEOUT,
    MESSAGE_DELIMITER,
    MESSAGE_DELIMITER_SIZE,
    MAX_MESSAGE_SIZE
)
from .exceptions import GNTPConnectionError, GNTPError
from .packers import MessagePackerFactory
from .requests import parse_response
from .keys import Key, Algorithm as KeyAlgorithm
from .ciphers import Cipher, Algorithm as CipherAlgorithm

__all__ = [
    'BaseGNTPConnection',
    'GNTPConnection',
    'GNTPClient'
]


[docs] class BaseGNTPConnection: """Abstract base class for GNTP connections. Defines the interface for GNTP server connections. Subclasses must implement write_message, read_message, and close methods. Attributes: final_callback: Callback when connection completes socket_callback: Optional callback for socket events """
[docs] def __init__( self, final_callback: Optional[Callable] = None, socket_callback: Optional[Any] = None ): """Initialize base connection. Args: final_callback: Called when operation completes socket_callback: Optional socket event callback """ self.final_callback = final_callback self.socket_callback = socket_callback
[docs] def on_ok_message(self, message: bytes) -> None: """Handle -OK response from server. Args: message: Response message bytes """ try: response = parse_response(message, '-OK') # If socket callback is registered, wait for callback message if self.socket_callback is not None: self.read_message(self.on_callback_message) finally: # Close if no callback expected if self.socket_callback is None: self.close() # Call final callback if no socket callback if self.socket_callback is None and self.final_callback is not None: self.final_callback(response)
[docs] def on_callback_message(self, message: bytes) -> None: """Handle -CALLBACK response from server. Args: message: Callback message bytes """ try: response = parse_response(message, '-CALLBACK') callback_result = self.socket_callback(response) finally: self.close() # Call final callback with result if self.final_callback is not None: self.final_callback(callback_result)
[docs] def write_message(self, message: bytes) -> None: """Send message to server. Subclasses must implement this method. Args: message: Message bytes to send """ raise NotImplementedError
[docs] def read_message(self, callback: Callable[[bytes], None]) -> None: """Read message from server. Subclasses must implement this method. Args: callback: Function to call with received message """ raise NotImplementedError
[docs] def close(self) -> None: """Close the connection. Subclasses must implement this method. """ raise NotImplementedError
[docs] class GNTPConnection(BaseGNTPConnection): """Synchronous GNTP connection implementation. Provides blocking socket communication with GNTP server. Attributes: sock: TCP socket connection """
[docs] def __init__( self, address: Tuple[str, int], timeout: float, final_callback: Optional[Callable] = None, socket_callback: Optional[Any] = None ): """Initialize GNTP connection. Args: address: (host, port) tuple timeout: Connection timeout in seconds final_callback: Called when operation completes socket_callback: Optional socket event callback Raises: GNTPConnectionError: If connection fails """ super().__init__(final_callback, socket_callback) try: self.sock = socket.create_connection(address, timeout=timeout) except socket.error as e: raise GNTPConnectionError( f"Failed to connect to {address[0]}:{address[1]}: {e}" )
[docs] def write_message(self, message: bytes) -> None: """Send message to GNTP server. Args: message: Message bytes to send Raises: GNTPConnectionError: If send fails """ try: self.sock.sendall(message) except socket.error as e: raise GNTPConnectionError(f"Failed to send message: {e}")
[docs] def read_message(self, callback: Callable[[bytes], None]) -> None: """Read message from server and call callback. Args: callback: Function to call with received message Raises: GNTPConnectionError: If read fails """ try: message = next(generate_messages(self.sock)) callback(message) except StopIteration: raise GNTPConnectionError("Connection closed by server") except socket.error as e: raise GNTPConnectionError(f"Failed to read message: {e}")
[docs] def close(self) -> None: """Close the socket connection.""" if self.sock: try: self.sock.close() except Exception: pass finally: self.sock = None
[docs] def generate_messages( sock: socket.socket, buffer_size: int = 1024 ) -> Generator[bytes, None, None]: """Generate complete GNTP messages from socket. Reads from socket and yields complete messages terminated by double CRLF (\\r\\n\\r\\n). Args: sock: Socket to read from buffer_size: Read buffer size in bytes Yields: Complete GNTP message bytes Raises: GNTPError: If message exceeds maximum size """ buffer = b'' while True: chunk = sock.recv(buffer_size) if not chunk: break buffer += chunk # Look for message delimiter delimiter_pos = buffer.find(MESSAGE_DELIMITER) # Check message size limits if delimiter_pos < 0 and len(buffer) >= MAX_MESSAGE_SIZE: raise GNTPError(f"Message too large: {len(buffer)} bytes") if delimiter_pos > MAX_MESSAGE_SIZE - MESSAGE_DELIMITER_SIZE: raise GNTPError(f"Message too large: {delimiter_pos} bytes") # Yield complete message if delimiter_pos >= 0: end_pos = delimiter_pos + MESSAGE_DELIMITER_SIZE yield buffer[:end_pos] buffer = buffer[end_pos:]
[docs] class GNTPClient: """Synchronous GNTP client. Handles request processing with optional authentication and encryption. Attributes: address: (host, port) of GNTP server timeout: Connection timeout in seconds connection_class: Connection class to use packer_factory: Factory for creating message packers """
[docs] def __init__( self, host: str = 'localhost', port: int = DEFAULT_PORT, timeout: float = DEFAULT_TIMEOUT, password: Optional[str] = None, key_hashing: Optional[KeyAlgorithm] = None, encryption: Optional[CipherAlgorithm] = None, connection_class: Optional[type] = None ): """Initialize GNTP client. Args: host: GNTP server hostname port: GNTP server port timeout: Connection timeout in seconds password: Optional password for authentication key_hashing: Key hashing algorithm (default: SHA256) encryption: Optional encryption algorithm connection_class: Connection class (default: GNTPConnection) Raises: GNTPError: If encryption key size is too large for hashing Example: >>> from gntplib import keys, ciphers >>> client = GNTPClient( ... host='localhost', ... password='secret', ... key_hashing=keys.SHA256, ... encryption=ciphers.AES ... ) """ self.address = (host, port) self.timeout = timeout self.connection_class = connection_class or GNTPConnection # Set default key hashing if password and key_hashing is None: from .keys import SHA256 key_hashing = SHA256 # Validate encryption vs hashing key size if encryption and key_hashing: if encryption.key_size > key_hashing.key_size: raise GNTPError( f"Key hashing size ({key_hashing.algorithm_id}:" f"{key_hashing.key_size}) must be at least encryption " f"key size ({encryption.algorithm_id}:" f"{encryption.key_size})" ) # Create packer factory self.packer_factory = MessagePackerFactory( password, key_hashing, encryption )
[docs] def process_request( self, request: Any, callback: Optional[Callable] = None, **kwargs ) -> None: """Process a GNTP request. Serializes the request, sends it to the server, and handles the response. Args: request: Request object to send callback: Optional callback when operation completes **kwargs: Additional arguments passed to connection Example: >>> from gntplib import RegisterRequest, Event >>> request = RegisterRequest('MyApp', None, [Event('test')]) >>> client.process_request(request) """ # Pack the message packer = self.packer_factory.create() message = packer.pack(request) # Create connection conn = self._connect(callback, **kwargs) # Send request conn.write_message(message) # Read response conn.read_message(conn.on_ok_message)
def _connect( self, final_callback: Optional[Callable] = None, **kwargs ) -> BaseGNTPConnection: """Create connection to GNTP server. Args: final_callback: Callback when operation completes **kwargs: Additional connection arguments Returns: Connection instance """ return self.connection_class( self.address, self.timeout, final_callback, **kwargs )
[docs] def __repr__(self) -> str: """Return string representation.""" return ( f"GNTPClient(host={self.address[0]!r}, " f"port={self.address[1]}, " f"timeout={self.timeout})" )