The official Python SDK for Togo MQ provides a comprehensive, Pythonic client for integrating Togo MQ into your Python applications.
Install the SDK using pip:
pip install togomq-sdk
Create a simple test file:
from togomq import Client, Config, Message
# Create client
config = Config(token="test-token")
client = Client(config)
print("TogoMQ SDK installed successfully!")
client.close()
Run: python test_install.py
from togomq import Client, Config, Message
# Create client with your token
config = Config(token="your-token-here")
client = Client(config)
try:
# Publish a message
messages = [Message("orders", b"Hello TogoMQ!")]
response = client.pub_batch(messages)
print(f"Published {response.messages_received} messages")
finally:
client.close()
The SDK supports flexible configuration with sensible defaults. You only need to provide your authentication token to get started.
from togomq import Config, Client
# Create client with defaults (only token is required)
config = Config(token="your-token-here")
client = Client(config)
Default values:
q.togomq.io5123infoTrue| Option | Default Value | Description |
|---|---|---|
| host | q.togomq.io |
TogoMQ server hostname |
| port | 5123 |
TogoMQ server port |
| log_level | info |
Logging level: debug, info, warn, error, none |
| token | (required) | Authentication token from your dashboard |
| use_tls | True |
Whether to use TLS encryption |
config = Config(
token="your-token-here",
host="custom.togomq.io",
port=9000,
log_level="debug",
use_tls=True,
)
client = Client(config)
Development:
import os
config = Config(
token=os.getenv("TOGOMQ_DEV_TOKEN"),
log_level="debug", # Verbose logging
)
Production:
config = Config(
token=os.getenv("TOGOMQ_PROD_TOKEN"),
log_level="error", # Only log errors
)
Use with statement for automatic cleanup:
with Client(config) as client:
client.pub_batch(messages)
# Connection automatically closed
Togo MQ provides flexible methods for publishing messages: individual, batches, or streaming generators.
{danger} Important: The topic name is required for all published messages.
class Message:
topic: str # Message topic (required)
body: bytes # Message payload
variables: Dict[str, str] # Custom key-value metadata
postpone: int # Delay in seconds before message is available
retention: int # How long to keep message (seconds)
Creating a message:
from togomq import Message
# Simple message
msg = Message("orders", b"order-data-here")
# Message with options
msg = Message("orders", b"order-data").with_variables({
"priority": "high",
"customer": "12345",
}).with_postpone(60).with_retention(3600)
from togomq import Client, Config, Message
# Create client
config = Config(token="your-token")
client = Client(config)
try:
# Create messages - topic is required
messages = [
Message("orders", b"order-1"),
Message("orders", b"order-2").with_variables({
"priority": "high",
"customer": "12345",
}),
Message("orders", b"order-3")
.with_postpone(60) # Delay 60 seconds
.with_retention(3600), # Keep for 1 hour
]
# Publish
response = client.pub_batch(messages)
print(f"Published {response.messages_received} messages")
finally:
client.close()
For high-throughput scenarios:
from typing import Generator
def message_generator() -> Generator[Message, None, None]:
for i in range(1000):
msg = Message("events", f"event-{i}".encode())
yield msg
# Publish streaming
with Client(config) as client:
response = client.pub(message_generator())
print(f"Published {response.messages_received} messages")
Custom Metadata:
msg = Message("orders", b"order-data").with_variables({
"order_id": "12345",
"customer_id": "98765",
"priority": "high",
"region": "us-east",
})
Delayed Messages:
# Delay 60 seconds
msg = Message("reminders", b"Send email").with_postpone(60)
# Delay 1 hour
msg = Message("scheduled-tasks", b"Run backup").with_postpone(3600)
Message Retention:
# Keep for 1 hour
msg = Message("temporary-events", b"data").with_retention(3600)
1. Use Context Manager:
# Good ✅
with Client(config) as client:
client.pub_batch(messages)
2. Batch Publishing:
# Good ✅
messages = [msg1, msg2, msg3]
client.pub_batch(messages)
3. Handle Errors:
from togomq import TogoMQError, ErrorCode
try:
response = client.pub_batch(messages)
except TogoMQError as e:
if e.code == ErrorCode.AUTH:
print("Authentication failed")
elif e.code == ErrorCode.CONNECTION:
print("Connection error")
Subscribe to messages published to specific topics using generator-based iteration.
{danger} Important: Topic is required. Use wildcards like
"orders.*"or"*"for all topics.
class Message:
topic: str # Message topic
uuid: str # Unique message identifier
body: bytes # Message payload
variables: Dict[str, str] # Custom key-value metadata
from togomq import Client, Config, SubscribeOptions
# Create client
config = Config(token="your-token")
client = Client(config)
try:
# Subscribe to specific topic (topic is required)
options = SubscribeOptions("orders")
msg_gen, err_gen = client.sub(options)
# Receive messages
for msg in msg_gen:
print(f"Received from {msg.topic}: {msg.body.decode()}")
print(f"UUID: {msg.uuid}")
# Access variables
if "priority" in msg.variables:
print(f"Priority: {msg.variables['priority']}")
finally:
client.close()
Batch Size - Control messages received at once:
options = SubscribeOptions("orders").with_batch(10)
Rate Limiting - Limit messages per second:
options = SubscribeOptions("orders").with_speed_per_sec(100)
Combined Options:
# Default: Batch = 0 (server default 1000), SpeedPerSec = 0 (unlimited)
options = (SubscribeOptions("orders.*")
.with_batch(10)
.with_speed_per_sec(100))
msg_gen, err_gen = client.sub(options)
All topics:
# Subscribe to all topics using "*" wildcard
options = SubscribeOptions("*")
msg_gen, err_gen = client.sub(options)
Pattern matching:
# All order topics (orders.new, orders.updated, etc.)
options = SubscribeOptions("orders.*")
msg_gen, err_gen = client.sub(options)
# All critical events
options = SubscribeOptions("*.critical")
msg_gen, err_gen = client.sub(options)
1. Monitor Error Generator:
msg_gen, err_gen = client.sub(options)
for msg in msg_gen:
try:
process_message(msg)
except Exception as e:
# Handle message processing errors
print(f"Error processing {msg.uuid}: {e}")
# Check error generator
for err in err_gen:
print(f"Subscription error: {err}")
2. Use Context Manager:
with Client(config) as client:
options = SubscribeOptions("orders")
msg_gen, err_gen = client.sub(options)
for msg in msg_gen:
process_message(msg)
3. Thread-Safe Processing:
import threading
def process_worker(msg_queue):
for msg in msg_queue:
process_message(msg)
# Start worker thread
worker = threading.Thread(target=process_worker, args=(msg_gen,))
worker.start()
The SDK provides detailed error information for graceful failure handling and debugging.
from togomq import TogoMQError, ErrorCode
try:
response = client.pub_batch(messages)
except TogoMQError as e:
# Check error type
print(f"Error code: {e.code}")
print(f"Error message: {e.message}")
| Error Code | Description | Common Causes |
|---|---|---|
ErrorCode.CONNECTION |
Connection/network errors | Network issues, firewall, DNS |
ErrorCode.AUTH |
Authentication failures | Invalid/expired/revoked token |
ErrorCode.VALIDATION |
Invalid input/config | Missing topic, invalid data |
ErrorCode.PUBLISH |
Publishing errors | Server-side failures |
ErrorCode.SUBSCRIBE |
Subscription errors | Server-side failures |
ErrorCode.STREAM |
Streaming errors | Stream interrupted |
ErrorCode.CONFIGURATION |
Config errors | Invalid host/port |
Basic:
try:
response = client.pub_batch(messages)
except TogoMQError as e:
print(f"Failed: {e}")
Detailed:
from togomq import TogoMQError, ErrorCode
try:
response = client.pub_batch(messages)
except TogoMQError as e:
if e.code == ErrorCode.AUTH:
print("Authentication failed - check token")
elif e.code == ErrorCode.CONNECTION:
print("Connection error - will retry")
elif e.code == ErrorCode.VALIDATION:
print(f"Validation error: {e.message}")
else:
print(f"Error ({e.code}): {e.message}")
import time
def publish_with_retry(client, messages, max_retries=3):
for attempt in range(max_retries):
try:
return client.pub_batch(messages)
except TogoMQError as e:
# Don't retry auth/validation errors
if e.code in [ErrorCode.AUTH, ErrorCode.VALIDATION]:
raise
if attempt < max_retries - 1:
backoff = 2 ** attempt
print(f"Retry {attempt + 1} after {backoff}s")
time.sleep(backoff)
else:
raise
Create one client per application and reuse it:
# Good ✅
client = Client(config)
try:
for _ in range(1000):
client.pub_batch(messages)
finally:
client.close()
Prefer with statement for automatic cleanup:
# Good ✅
with Client(config) as client:
client.pub_batch(messages)
Always check and handle errors:
try:
response = client.pub_batch(messages)
print(f"Published {response.messages_received} messages")
except TogoMQError as e:
print(f"Error: {e}")
Use pub_batch() for better performance:
# Good ✅
messages = [msg1, msg2, msg3]
client.pub_batch(messages)
Handle both message and error generators:
msg_gen, err_gen = client.sub(options)
for msg in msg_gen:
process_message(msg)
# Always check error generator
for err in err_gen:
handle_error(err)
Control logging based on environment:
# Development
config = Config(token="...", log_level="debug")
# Production
config = Config(token="...", log_level="error")
#!/usr/bin/env python3
import os
from togomq import Client, Config, Message
def main():
# Create client
config = Config(token=os.environ['TOGOMQ_TOKEN'])
with Client(config) as client:
# Create messages
messages = []
for i in range(100):
msg = Message(
topic="events",
body=f"event-{i}".encode()
).with_variables({
"index": str(i),
"type": "test"
})
messages.append(msg)
# Publish batch
try:
response = client.pub_batch(messages)
print(f"Published {response.messages_received} messages")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import os
import signal
from togomq import Client, Config, SubscribeOptions
shutdown = False
def signal_handler(sig, frame):
global shutdown
print("\nShutting down gracefully...")
shutdown = True
def main():
signal.signal(signal.SIGINT, signal_handler)
config = Config(token=os.environ['TOGOMQ_TOKEN'])
with Client(config) as client:
# Subscribe with options
options = (SubscribeOptions("orders.*")
.with_batch(10)
.with_speed_per_sec(100))
msg_gen, err_gen = client.sub(options)
print("Subscribed, waiting for messages...")
try:
for msg in msg_gen:
if shutdown:
break
print(f"[{msg.uuid}] {msg.topic}: {msg.body.decode()}")
# Process message
except Exception as e:
print(f"Subscription error: {e}")
if __name__ == "__main__":
main()
{success} Check out the FAQ for common questions about Togo MQ.