The official Go SDK for Togo MQ provides a comprehensive, production-ready client for integrating Togo MQ into your Go applications.
The official Togo MQ SDK for Go is available via go get:
go get github.com/TogoMQ/togomq-sdk-go
This will download the latest version of the SDK and all its dependencies.
Install a specific version:
go get github.com/TogoMQ/togomq-sdk-go@v0.2.3
Update to latest version:
go get -u github.com/TogoMQ/togomq-sdk-go
Create a simple test file to verify the installation:
package main
import (
"fmt"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
config := togomq.NewConfig(
togomq.WithToken("test-token"),
)
fmt.Println("TogoMQ SDK installed successfully!")
fmt.Printf("Config created: %+v\n", config)
}
Run: go run test-install.go
package main
import (
"context"
"log"
"os"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create configuration
config := togomq.NewConfig(
togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
)
// Create client
client, err := togomq.NewClient(config)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
// Publish a message
msg := togomq.NewMessage("test-topic", []byte("Hello, TogoMQ!"))
resp, err := client.PubBatch(context.Background(), []*togomq.Message{msg})
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published %d messages successfully!", resp.MessagesReceived)
}
The SDK supports flexible configuration with sensible defaults. You only need to provide your authentication token to get started.
import "github.com/TogoMQ/togomq-sdk-go"
// Create client with defaults (only token is required)
config := togomq.NewConfig(
togomq.WithToken("your-token-here"),
)
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
Default values:
q.togomq.io5123info| Option | Default Value | Description |
|---|---|---|
| Host | q.togomq.io |
TogoMQ server hostname |
| Port | 5123 |
TogoMQ server port |
| LogLevel | info |
Logging level: debug, info, warn, error, none |
| Token | (required) | Authentication token from your dashboard |
WithHost(host string) - Set custom server hostname:
config := togomq.NewConfig(
togomq.WithHost("custom.togomq.io"),
togomq.WithToken("your-token"),
)
WithPort(port int) - Set custom port:
config := togomq.NewConfig(
togomq.WithPort(9000),
togomq.WithToken("your-token"),
)
WithLogLevel(level string) - Control logging verbosity:
config := togomq.NewConfig(
togomq.WithLogLevel("debug"), // debug, info, warn, error, none
togomq.WithToken("your-token"),
)
Log Levels:
debug - All logs including debug information (verbose)info - Informational messages and above (default)warn - Warnings and errors onlyerror - Error messages onlynone - Disable all loggingCombine multiple options:
config := togomq.NewConfig(
togomq.WithHost("custom.togomq.io"),
togomq.WithPort(9000),
togomq.WithLogLevel("debug"),
togomq.WithToken("your-token-here"),
)
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
Development:
config := togomq.NewConfig(
togomq.WithToken(os.Getenv("TOGOMQ_DEV_TOKEN")),
togomq.WithLogLevel("debug"), // Verbose logging
)
Production:
config := togomq.NewConfig(
togomq.WithToken(os.Getenv("TOGOMQ_PROD_TOKEN")),
togomq.WithLogLevel("error"), // Only log errors
)
Togo MQ provides flexible methods for publishing messages: individual, batches, or streaming channels.
{danger} Important: The topic name is required for all published messages.
type Message struct {
Topic string // Message topic (required)
Body []byte // Message payload
Variables map[string]string // Custom key-value metadata
Postpone int64 // Delay in seconds before message is available
Retention int64 // How long to keep message (seconds)
}
Creating a message:
// Simple message
msg := togomq.NewMessage("orders", []byte("order-data-here"))
// Message with options
msg := togomq.NewMessage("orders", []byte("order-data")).
WithVariables(map[string]string{
"priority": "high",
"customer": "12345",
}).
WithPostpone(60). // Delay 60 seconds
WithRetention(3600) // Keep for 1 hour
// Create a message
msg := togomq.NewMessage("user-events", []byte("User logged in"))
// Publish
resp, err := client.PubBatch(context.Background(), []*togomq.Message{msg})
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published %d messages\n", resp.MessagesReceived)
// Create multiple messages
messages := []*togomq.Message{
togomq.NewMessage("orders", []byte("order-1")),
togomq.NewMessage("orders", []byte("order-2")).
WithVariables(map[string]string{
"priority": "high",
"customer": "12345",
}),
togomq.NewMessage("orders", []byte("order-3")).
WithPostpone(60).
WithRetention(3600),
}
// Publish all at once
resp, err := client.PubBatch(context.Background(), messages)
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published %d messages successfully!\n", resp.MessagesReceived)
For high-throughput scenarios:
ctx := context.Background()
// Create a channel for messages
msgChan := make(chan *togomq.Message, 100)
// Start publishing in background
go func() {
resp, err := client.Pub(ctx, msgChan)
if err != nil {
log.Printf("Publish error: %v", err)
return
}
log.Printf("Published %d messages\n", resp.MessagesReceived)
}()
// Send messages over time
for i := 0; i < 1000; i++ {
msg := togomq.NewMessage("events", []byte(fmt.Sprintf("event-%d", i)))
msgChan <- msg
time.Sleep(10 * time.Millisecond)
}
// Close channel to signal end of stream
close(msgChan)
time.Sleep(2 * time.Second)
Custom Metadata:
msg := togomq.NewMessage("orders", []byte("order-data")).
WithVariables(map[string]string{
"order_id": "12345",
"customer_id": "98765",
"priority": "high",
"region": "us-east",
})
Delayed Messages:
// Delay 60 seconds
msg := togomq.NewMessage("reminders", []byte("Send email")).
WithPostpone(60)
// Delay 1 hour
msg := togomq.NewMessage("scheduled-tasks", []byte("Run backup")).
WithPostpone(3600)
Message Retention:
// Keep for 1 hour
msg := togomq.NewMessage("temporary-events", []byte("data")).
WithRetention(3600)
1. Reuse Clients:
// Good ✅
client, _ := togomq.NewClient(config)
defer client.Close()
for i := 0; i < 1000; i++ {
client.PubBatch(ctx, messages)
}
2. Use Batch Publishing:
// Good ✅
messages := []*togomq.Message{msg1, msg2, msg3}
client.PubBatch(ctx, messages)
3. Handle Errors:
resp, err := client.PubBatch(ctx, messages)
if err != nil {
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
switch togomqErr.Code {
case togomq.ErrCodeAuth:
log.Println("Authentication failed")
case togomq.ErrCodeConnection:
log.Println("Connection error - retry")
}
}
}
4. Use Timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.PubBatch(ctx, messages)
Subscribe to messages published to specific topics using a simple channel-based API.
{danger} Important: Topic is required. Use wildcards like
"orders.*"or"*"for all topics.
type Message struct {
Topic string // Message topic
UUID string // Unique message identifier
Body []byte // Message payload
Variables map[string]string // Custom metadata
}
// Subscribe to specific topic
opts := togomq.NewSubscribeOptions("orders")
msgChan, errChan, err := client.Sub(context.Background(), opts)
if err != nil {
log.Fatal(err)
}
// Receive messages
for {
select {
case msg, ok := <-msgChan:
if !ok {
log.Println("Subscription ended")
return
}
log.Printf("Received from %s: %s\n", msg.Topic, string(msg.Body))
log.Printf("UUID: %s\n", msg.UUID)
// Access custom variables
if priority, ok := msg.Variables["priority"]; ok {
log.Printf("Priority: %s\n", priority)
}
case err := <-errChan:
log.Printf("Subscription error: %v\n", err)
return
}
}
Batch Size - Control messages received at once:
opts := togomq.NewSubscribeOptions("orders").
WithBatch(10)
Rate Limiting - Limit messages per second:
opts := togomq.NewSubscribeOptions("orders").
WithSpeedPerSec(100)
Combined Options:
opts := togomq.NewSubscribeOptions("events").
WithBatch(50).
WithSpeedPerSec(200)
msgChan, errChan, err := client.Sub(context.Background(), opts)
All topics:
opts := togomq.NewSubscribeOptions("*")
msgChan, errChan, err := client.Sub(ctx, opts)
Pattern matching:
// All order topics (orders.new, orders.updated, etc.)
opts := togomq.NewSubscribeOptions("orders.*")
// All critical events
opts := togomq.NewSubscribeOptions("*.critical")
Timeout-based:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
opts := togomq.NewSubscribeOptions("events")
msgChan, errChan, err := client.Sub(ctx, opts)
for {
select {
case msg := <-msgChan:
processMessage(msg)
case err := <-errChan:
log.Printf("Error: %v\n", err)
return
case <-ctx.Done():
log.Println("Subscription timeout")
return
}
}
Manual cancellation:
ctx, cancel := context.WithCancel(context.Background())
// Cancel after 100 messages
count := 0
for msg := range msgChan {
processMessage(msg)
count++
if count >= 100 {
cancel()
return
}
}
// Create worker pool
workers := 10
workerChan := make(chan *togomq.Message, 100)
// Start workers
for i := 0; i < workers; i++ {
go worker(workerChan)
}
// Distribute messages
for {
select {
case msg := <-msgChan:
if msg == nil {
close(workerChan)
return
}
workerChan <- msg
case err := <-errChan:
log.Printf("Error: %v\n", err)
close(workerChan)
return
}
}
func worker(msgChan <-chan *togomq.Message) {
for msg := range msgChan {
processMessage(msg)
}
}
1. Monitor Error Channel:
for {
select {
case msg := <-msgChan:
processMessage(msg)
case err := <-errChan:
log.Printf("Error: %v\n", err)
return
}
}
2. Handle Nil Messages:
case msg := <-msgChan:
if msg == nil {
return
}
processMessage(msg)
3. Graceful Shutdown:
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-sigChan
cancel()
}()
msgChan, errChan, err := client.Sub(ctx, opts)
The SDK provides detailed error information for graceful failure handling and debugging.
type TogoMQError struct {
Code string // Error code for programmatic handling
Message string // Human-readable error message
Err error // Original underlying error (if any)
}
Type assertion:
resp, err := client.PubBatch(ctx, messages)
if err != nil {
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
log.Printf("Code: %s, Message: %s\n", togomqErr.Code, togomqErr.Message)
}
}
| Error Code | Description | Common Causes |
|---|---|---|
ErrCodeConnection |
Connection/network errors | Network issues, firewall, DNS |
ErrCodeAuth |
Authentication failures | Invalid/expired/revoked token |
ErrCodeValidation |
Invalid input/config | Missing topic, invalid data |
ErrCodePublish |
Publishing errors | Server-side failures |
ErrCodeSubscribe |
Subscription errors | Server-side failures |
ErrCodeStream |
Streaming errors | Stream interrupted |
ErrCodeConfiguration |
Config errors | Invalid host/port |
Basic:
resp, err := client.PubBatch(ctx, messages)
if err != nil {
log.Printf("Failed: %v\n", err)
return
}
Detailed:
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
switch togomqErr.Code {
case togomq.ErrCodeAuth:
log.Println("Authentication failed - check token")
case togomq.ErrCodeConnection:
log.Println("Connection error - will retry")
case togomq.ErrCodeValidation:
log.Printf("Validation error: %s\n", togomqErr.Message)
default:
log.Printf("Error (%s): %s\n", togomqErr.Code, togomqErr.Message)
}
}
Authentication errors:
if togomqErr.Code == togomq.ErrCodeAuth {
log.Println("Authentication failed!")
log.Println("- Token is invalid")
log.Println("- Token has been revoked")
log.Println("- Token has expired")
}
Connection errors with retry:
if togomqErr.Code == togomq.ErrCodeConnection {
for attempt := 1; attempt <= 3; attempt++ {
time.Sleep(time.Duration(attempt) * time.Second)
resp, err = client.PubBatch(ctx, messages)
if err == nil {
break
}
}
}
Context timeout:
if err == context.DeadlineExceeded {
log.Println("Operation timed out!")
}
1. Always Check Errors:
// Good ✅
resp, err := client.PubBatch(ctx, messages)
if err != nil {
log.Printf("Error: %v\n", err)
return
}
2. Implement Retry Logic:
func publishWithRetry(client *togomq.Client, messages []*togomq.Message, maxRetries int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err := client.PubBatch(ctx, messages)
cancel()
if err == nil {
return nil
}
// Don't retry auth/validation errors
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
if togomqErr.Code == togomq.ErrCodeAuth ||
togomqErr.Code == togomq.ErrCodeValidation {
return err
}
}
// Exponential backoff
if attempt < maxRetries-1 {
backoff := time.Duration(1<<uint(attempt)) * time.Second
time.Sleep(backoff)
}
}
return err
}
3. Log Error Details:
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
log.Printf("Error - Code: %s, Message: %s, Underlying: %v\n",
togomqErr.Code, togomqErr.Message, togomqErr.Err)
}
4. Monitor Error Rates:
var totalRequests, failedRequests int64
func publish(client *togomq.Client, messages []*togomq.Message) {
atomic.AddInt64(&totalRequests, 1)
resp, err := client.PubBatch(ctx, messages)
if err != nil {
atomic.AddInt64(&failedRequests, 1)
errorRate := float64(failedRequests) / float64(totalRequests)
if errorRate > 0.05 {
log.Printf("WARNING: High error rate: %.2f%%\n", errorRate*100)
}
}
}
Create one client per application and reuse it:
// Good ✅
client, _ := togomq.NewClient(config)
defer client.Close()
for i := 0; i < 1000; i++ {
client.PubBatch(ctx, messages)
}
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Use client...
resp, err := client.PubBatch(ctx, messages)
if err != nil {
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
log.Printf("Error (%s): %s\n", togomqErr.Code, togomqErr.Message)
}
return
}
log.Printf("Published %d messages\n", resp.MessagesReceived)
// Good ✅
messages := []*togomq.Message{msg1, msg2, msg3}
client.PubBatch(ctx, messages)
Always provide valid topic names:
// Good ✅
msg := togomq.NewMessage("orders", []byte("data"))
// Bad ❌
msg := togomq.NewMessage("", []byte("data")) // Empty topic will fail
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.PubBatch(ctx, messages)
Control logging based on environment:
// Development
config := togomq.NewConfig(
togomq.WithToken(token),
togomq.WithLogLevel("debug"),
)
// Production
config := togomq.NewConfig(
togomq.WithToken(token),
togomq.WithLogLevel("error"),
)
package main
import (
"context"
"fmt"
"log"
"os"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create client
config := togomq.NewConfig(
togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
togomq.WithLogLevel("debug"),
)
client, err := togomq.NewClient(config)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
// Create messages
messages := make([]*togomq.Message, 100)
for i := 0; i < 100; i++ {
messages[i] = togomq.NewMessage(
"events",
[]byte(fmt.Sprintf("event-%d", i)),
).WithVariables(map[string]string{
"index": fmt.Sprintf("%d", i),
"type": "test",
})
}
// Publish batch
ctx := context.Background()
resp, err := client.PubBatch(ctx, messages)
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published %d messages successfully!\n", resp.MessagesReceived)
}
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Graceful shutdown handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Create client
config := togomq.NewConfig(
togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
)
client, err := togomq.NewClient(config)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
// Create context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Subscribe with options
opts := togomq.NewSubscribeOptions("orders.*").
WithBatch(10).
WithSpeedPerSec(100)
msgChan, errChan, err := client.Sub(ctx, opts)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
log.Println("Subscribed, waiting for messages...")
for {
select {
case msg, ok := <-msgChan:
if !ok {
log.Println("Subscription ended")
return
}
log.Printf("[%s] %s: %s\n", msg.UUID, msg.Topic, string(msg.Body))
// Process message
processMessage(msg)
case err := <-errChan:
log.Printf("Subscription error: %v\n", err)
return
case <-sigChan:
log.Println("Shutting down gracefully...")
cancel()
return
}
}
}
func processMessage(msg *togomq.Message) {
// Your business logic
}
{success} Check out the FAQ for common questions about Togo MQ.