Python SDK


The official Python SDK for Togo MQ provides a comprehensive, Pythonic client for integrating Togo MQ into your Python applications.

Installation

Requirements

  • Python 3.9 or higher
  • Access to a TogoMQ account - Sign up here if you don't have one
  • Valid TogoMQ authentication token - Generate one in your dashboard

Install SDK

Install the SDK using pip:

pip install togomq-sdk

Verify Installation

Create a simple test file:

from togomq import Client, Config, Message

# Create client
config = Config(token="test-token")
client = Client(config)

print("TogoMQ SDK installed successfully!")
client.close()

Run: python test_install.py

Quick Start

from togomq import Client, Config, Message

# Create client with your token
config = Config(token="your-token-here")
client = Client(config)

try:
    # Publish a message
    messages = [Message("orders", b"Hello TogoMQ!")]
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
finally:
    client.close()

SDK Features

  • 🚀 High Performance - Built on gRPC for efficient communication
  • 📡 Streaming Support - Native support for streaming pub/sub operations
  • 🔒 Secure - TLS encryption and token-based authentication
  • 🎯 Pythonic API - Idiomatic Python with type hints
  • 📝 Type Safety - Full type annotations for IDE autocomplete
  • Thread-Safe - Safe for concurrent use with threading
  • Well Tested - Comprehensive pytest test coverage
  • 🐍 Modern Python - Python 3.9+ support

Configuration

The SDK supports flexible configuration with sensible defaults. You only need to provide your authentication token to get started.

Default Configuration

from togomq import Config, Client

# Create client with defaults (only token is required)
config = Config(token="your-token-here")
client = Client(config)

Default values:

  • Host: q.togomq.io
  • Port: 5123
  • LogLevel: info
  • Token: (required - no default)
  • UseTLS: True

Configuration Options

Option Default Value Description
host q.togomq.io TogoMQ server hostname
port 5123 TogoMQ server port
log_level info Logging level: debug, info, warn, error, none
token (required) Authentication token from your dashboard
use_tls True Whether to use TLS encryption

Custom Configuration

config = Config(
    token="your-token-here",
    host="custom.togomq.io",
    port=9000,
    log_level="debug",
    use_tls=True,
)
client = Client(config)

Environment-Based Configuration

Development:

import os

config = Config(
    token=os.getenv("TOGOMQ_DEV_TOKEN"),
    log_level="debug",  # Verbose logging
)

Production:

config = Config(
    token=os.getenv("TOGOMQ_PROD_TOKEN"),
    log_level="error",  # Only log errors
)

Context Manager

Use with statement for automatic cleanup:

with Client(config) as client:
    client.pub_batch(messages)
# Connection automatically closed

Publishing Messages

Togo MQ provides flexible methods for publishing messages: individual, batches, or streaming generators.

{danger} Important: The topic name is required for all published messages.

Message Structure

class Message:
    topic: str              # Message topic (required)
    body: bytes            # Message payload
    variables: Dict[str, str]  # Custom key-value metadata
    postpone: int          # Delay in seconds before message is available
    retention: int         # How long to keep message (seconds)

Creating a message:

from togomq import Message

# Simple message
msg = Message("orders", b"order-data-here")

# Message with options
msg = Message("orders", b"order-data").with_variables({
    "priority": "high",
    "customer": "12345",
}).with_postpone(60).with_retention(3600)

Publishing a Batch of Messages

from togomq import Client, Config, Message

# Create client
config = Config(token="your-token")
client = Client(config)

try:
    # Create messages - topic is required
    messages = [
        Message("orders", b"order-1"),
        Message("orders", b"order-2").with_variables({
            "priority": "high",
            "customer": "12345",
        }),
        Message("orders", b"order-3")
            .with_postpone(60)      # Delay 60 seconds
            .with_retention(3600),  # Keep for 1 hour
    ]

    # Publish
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
finally:
    client.close()

Publishing via Generator (Streaming)

For high-throughput scenarios:

from typing import Generator

def message_generator() -> Generator[Message, None, None]:
    for i in range(1000):
        msg = Message("events", f"event-{i}".encode())
        yield msg

# Publish streaming
with Client(config) as client:
    response = client.pub(message_generator())
    print(f"Published {response.messages_received} messages")

Message Options

Custom Metadata:

msg = Message("orders", b"order-data").with_variables({
    "order_id": "12345",
    "customer_id": "98765",
    "priority": "high",
    "region": "us-east",
})

Delayed Messages:

# Delay 60 seconds
msg = Message("reminders", b"Send email").with_postpone(60)

# Delay 1 hour
msg = Message("scheduled-tasks", b"Run backup").with_postpone(3600)

Message Retention:

# Keep for 1 hour
msg = Message("temporary-events", b"data").with_retention(3600)

Best Practices

1. Use Context Manager:

# Good ✅
with Client(config) as client:
    client.pub_batch(messages)

2. Batch Publishing:

# Good ✅
messages = [msg1, msg2, msg3]
client.pub_batch(messages)

3. Handle Errors:

from togomq import TogoMQError, ErrorCode

try:
    response = client.pub_batch(messages)
except TogoMQError as e:
    if e.code == ErrorCode.AUTH:
        print("Authentication failed")
    elif e.code == ErrorCode.CONNECTION:
        print("Connection error")

Subscribing to Messages

Subscribe to messages published to specific topics using generator-based iteration.

{danger} Important: Topic is required. Use wildcards like "orders.*" or "*" for all topics.

Received Message Structure

class Message:
    topic: str              # Message topic
    uuid: str              # Unique message identifier
    body: bytes            # Message payload
    variables: Dict[str, str]  # Custom key-value metadata

Basic Subscription

from togomq import Client, Config, SubscribeOptions

# Create client
config = Config(token="your-token")
client = Client(config)

try:
    # Subscribe to specific topic (topic is required)
    options = SubscribeOptions("orders")
    msg_gen, err_gen = client.sub(options)

    # Receive messages
    for msg in msg_gen:
        print(f"Received from {msg.topic}: {msg.body.decode()}")
        print(f"UUID: {msg.uuid}")

        # Access variables
        if "priority" in msg.variables:
            print(f"Priority: {msg.variables['priority']}")
finally:
    client.close()

Advanced Subscription Options

Batch Size - Control messages received at once:

options = SubscribeOptions("orders").with_batch(10)

Rate Limiting - Limit messages per second:

options = SubscribeOptions("orders").with_speed_per_sec(100)

Combined Options:

# Default: Batch = 0 (server default 1000), SpeedPerSec = 0 (unlimited)
options = (SubscribeOptions("orders.*")
    .with_batch(10)
    .with_speed_per_sec(100))

msg_gen, err_gen = client.sub(options)

Wildcard Subscriptions

All topics:

# Subscribe to all topics using "*" wildcard
options = SubscribeOptions("*")
msg_gen, err_gen = client.sub(options)

Pattern matching:

# All order topics (orders.new, orders.updated, etc.)
options = SubscribeOptions("orders.*")
msg_gen, err_gen = client.sub(options)

# All critical events
options = SubscribeOptions("*.critical")
msg_gen, err_gen = client.sub(options)

Best Practices

1. Monitor Error Generator:

msg_gen, err_gen = client.sub(options)

for msg in msg_gen:
    try:
        process_message(msg)
    except Exception as e:
        # Handle message processing errors
        print(f"Error processing {msg.uuid}: {e}")

# Check error generator
for err in err_gen:
    print(f"Subscription error: {err}")

2. Use Context Manager:

with Client(config) as client:
    options = SubscribeOptions("orders")
    msg_gen, err_gen = client.sub(options)

    for msg in msg_gen:
        process_message(msg)

3. Thread-Safe Processing:

import threading

def process_worker(msg_queue):
    for msg in msg_queue:
        process_message(msg)

# Start worker thread
worker = threading.Thread(target=process_worker, args=(msg_gen,))
worker.start()

Error Handling

The SDK provides detailed error information for graceful failure handling and debugging.

Error Structure

from togomq import TogoMQError, ErrorCode

try:
    response = client.pub_batch(messages)
except TogoMQError as e:
    # Check error type
    print(f"Error code: {e.code}")
    print(f"Error message: {e.message}")

Error Codes

Error Code Description Common Causes
ErrorCode.CONNECTION Connection/network errors Network issues, firewall, DNS
ErrorCode.AUTH Authentication failures Invalid/expired/revoked token
ErrorCode.VALIDATION Invalid input/config Missing topic, invalid data
ErrorCode.PUBLISH Publishing errors Server-side failures
ErrorCode.SUBSCRIBE Subscription errors Server-side failures
ErrorCode.STREAM Streaming errors Stream interrupted
ErrorCode.CONFIGURATION Config errors Invalid host/port

Handling Errors

Basic:

try:
    response = client.pub_batch(messages)
except TogoMQError as e:
    print(f"Failed: {e}")

Detailed:

from togomq import TogoMQError, ErrorCode

try:
    response = client.pub_batch(messages)
except TogoMQError as e:
    if e.code == ErrorCode.AUTH:
        print("Authentication failed - check token")
    elif e.code == ErrorCode.CONNECTION:
        print("Connection error - will retry")
    elif e.code == ErrorCode.VALIDATION:
        print(f"Validation error: {e.message}")
    else:
        print(f"Error ({e.code}): {e.message}")

Retry Logic

import time

def publish_with_retry(client, messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            return client.pub_batch(messages)
        except TogoMQError as e:
            # Don't retry auth/validation errors
            if e.code in [ErrorCode.AUTH, ErrorCode.VALIDATION]:
                raise

            if attempt < max_retries - 1:
                backoff = 2 ** attempt
                print(f"Retry {attempt + 1} after {backoff}s")
                time.sleep(backoff)
            else:
                raise

Best Practices

1. Reuse Clients

Create one client per application and reuse it:

# Good ✅
client = Client(config)
try:
    for _ in range(1000):
        client.pub_batch(messages)
finally:
    client.close()

2. Use Context Managers

Prefer with statement for automatic cleanup:

# Good ✅
with Client(config) as client:
    client.pub_batch(messages)

3. Handle Errors

Always check and handle errors:

try:
    response = client.pub_batch(messages)
    print(f"Published {response.messages_received} messages")
except TogoMQError as e:
    print(f"Error: {e}")

4. Batch Messages

Use pub_batch() for better performance:

# Good ✅
messages = [msg1, msg2, msg3]
client.pub_batch(messages)

5. Monitor Subscriptions

Handle both message and error generators:

msg_gen, err_gen = client.sub(options)

for msg in msg_gen:
    process_message(msg)

# Always check error generator
for err in err_gen:
    handle_error(err)

6. Set Log Levels

Control logging based on environment:

# Development
config = Config(token="...", log_level="debug")

# Production
config = Config(token="...", log_level="error")

Complete Examples

Publishing Example

#!/usr/bin/env python3

import os
from togomq import Client, Config, Message

def main():
    # Create client
    config = Config(token=os.environ['TOGOMQ_TOKEN'])

    with Client(config) as client:
        # Create messages
        messages = []
        for i in range(100):
            msg = Message(
                topic="events",
                body=f"event-{i}".encode()
            ).with_variables({
                "index": str(i),
                "type": "test"
            })
            messages.append(msg)

        # Publish batch
        try:
            response = client.pub_batch(messages)
            print(f"Published {response.messages_received} messages")
        except Exception as e:
            print(f"Error: {e}")

if __name__ == "__main__":
    main()

Subscribing Example

#!/usr/bin/env python3

import os
import signal
from togomq import Client, Config, SubscribeOptions

shutdown = False

def signal_handler(sig, frame):
    global shutdown
    print("\nShutting down gracefully...")
    shutdown = True

def main():
    signal.signal(signal.SIGINT, signal_handler)

    config = Config(token=os.environ['TOGOMQ_TOKEN'])

    with Client(config) as client:
        # Subscribe with options
        options = (SubscribeOptions("orders.*")
            .with_batch(10)
            .with_speed_per_sec(100))

        msg_gen, err_gen = client.sub(options)

        print("Subscribed, waiting for messages...")

        try:
            for msg in msg_gen:
                if shutdown:
                    break

                print(f"[{msg.uuid}] {msg.topic}: {msg.body.decode()}")
                # Process message

        except Exception as e:
            print(f"Subscription error: {e}")

if __name__ == "__main__":
    main()

Resources


{success} Check out the FAQ for common questions about Togo MQ.