Go SDK


The official Go SDK for Togo MQ provides a comprehensive, production-ready client for integrating Togo MQ into your Go applications.

Installation

Requirements

  • Go 1.24 or higher
  • Access to a TogoMQ account - Sign up here if you don't have one
  • Valid TogoMQ authentication token - Generate one in your dashboard

Install SDK

The official Togo MQ SDK for Go is available via go get:

go get github.com/TogoMQ/togomq-sdk-go

This will download the latest version of the SDK and all its dependencies.

Install a specific version:

go get github.com/TogoMQ/togomq-sdk-go@v0.2.3

Update to latest version:

go get -u github.com/TogoMQ/togomq-sdk-go

Verify Installation

Create a simple test file to verify the installation:

package main

import (
    "fmt"
    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    config := togomq.NewConfig(
        togomq.WithToken("test-token"),
    )

    fmt.Println("TogoMQ SDK installed successfully!")
    fmt.Printf("Config created: %+v\n", config)
}

Run: go run test-install.go

Quick Start

package main

import (
    "context"
    "log"
    "os"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create configuration
    config := togomq.NewConfig(
        togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
    )

    // Create client
    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Close()

    // Publish a message
    msg := togomq.NewMessage("test-topic", []byte("Hello, TogoMQ!"))
    resp, err := client.PubBatch(context.Background(), []*togomq.Message{msg})
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }

    log.Printf("Published %d messages successfully!", resp.MessagesReceived)
}

SDK Features

  • 🚀 High Performance - Built on gRPC for efficient communication
  • 📡 Streaming Support - Native support for streaming pub/sub operations
  • 🔒 Secure - TLS encryption and token-based authentication
  • 🎯 Simple API - Easy-to-use client with fluent configuration
  • 📝 Comprehensive Logging - Configurable log levels for debugging
  • Concurrent - Safe for concurrent use with goroutines
  • Well Tested - Comprehensive test coverage

Configuration

The SDK supports flexible configuration with sensible defaults. You only need to provide your authentication token to get started.

Default Configuration

import "github.com/TogoMQ/togomq-sdk-go"

// Create client with defaults (only token is required)
config := togomq.NewConfig(
    togomq.WithToken("your-token-here"),
)

client, err := togomq.NewClient(config)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

Default values:

  • Host: q.togomq.io
  • Port: 5123
  • LogLevel: info
  • Token: (required - no default)

Configuration Options

Option Default Value Description
Host q.togomq.io TogoMQ server hostname
Port 5123 TogoMQ server port
LogLevel info Logging level: debug, info, warn, error, none
Token (required) Authentication token from your dashboard

WithHost(host string) - Set custom server hostname:

config := togomq.NewConfig(
    togomq.WithHost("custom.togomq.io"),
    togomq.WithToken("your-token"),
)

WithPort(port int) - Set custom port:

config := togomq.NewConfig(
    togomq.WithPort(9000),
    togomq.WithToken("your-token"),
)

WithLogLevel(level string) - Control logging verbosity:

config := togomq.NewConfig(
    togomq.WithLogLevel("debug"), // debug, info, warn, error, none
    togomq.WithToken("your-token"),
)

Log Levels:

  • debug - All logs including debug information (verbose)
  • info - Informational messages and above (default)
  • warn - Warnings and errors only
  • error - Error messages only
  • none - Disable all logging

Custom Configuration

Combine multiple options:

config := togomq.NewConfig(
    togomq.WithHost("custom.togomq.io"),
    togomq.WithPort(9000),
    togomq.WithLogLevel("debug"),
    togomq.WithToken("your-token-here"),
)

client, err := togomq.NewClient(config)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

Environment-Specific Configuration

Development:

config := togomq.NewConfig(
    togomq.WithToken(os.Getenv("TOGOMQ_DEV_TOKEN")),
    togomq.WithLogLevel("debug"), // Verbose logging
)

Production:

config := togomq.NewConfig(
    togomq.WithToken(os.Getenv("TOGOMQ_PROD_TOKEN")),
    togomq.WithLogLevel("error"), // Only log errors
)

Publishing Messages

Togo MQ provides flexible methods for publishing messages: individual, batches, or streaming channels.

{danger} Important: The topic name is required for all published messages.

Message Structure

type Message struct {
    Topic     string            // Message topic (required)
    Body      []byte            // Message payload
    Variables map[string]string // Custom key-value metadata
    Postpone  int64             // Delay in seconds before message is available
    Retention int64             // How long to keep message (seconds)
}

Creating a message:

// Simple message
msg := togomq.NewMessage("orders", []byte("order-data-here"))

// Message with options
msg := togomq.NewMessage("orders", []byte("order-data")).
    WithVariables(map[string]string{
        "priority": "high",
        "customer": "12345",
    }).
    WithPostpone(60).      // Delay 60 seconds
    WithRetention(3600)    // Keep for 1 hour

Publishing a Single Message

// Create a message
msg := togomq.NewMessage("user-events", []byte("User logged in"))

// Publish
resp, err := client.PubBatch(context.Background(), []*togomq.Message{msg})
if err != nil {
    log.Fatalf("Failed to publish: %v", err)
}

log.Printf("Published %d messages\n", resp.MessagesReceived)

Publishing a Batch of Messages

// Create multiple messages
messages := []*togomq.Message{
    togomq.NewMessage("orders", []byte("order-1")),
    togomq.NewMessage("orders", []byte("order-2")).
        WithVariables(map[string]string{
            "priority": "high",
            "customer": "12345",
        }),
    togomq.NewMessage("orders", []byte("order-3")).
        WithPostpone(60).
        WithRetention(3600),
}

// Publish all at once
resp, err := client.PubBatch(context.Background(), messages)
if err != nil {
    log.Fatalf("Failed to publish: %v", err)
}

log.Printf("Published %d messages successfully!\n", resp.MessagesReceived)

Publishing via Channel (Streaming)

For high-throughput scenarios:

ctx := context.Background()

// Create a channel for messages
msgChan := make(chan *togomq.Message, 100)

// Start publishing in background
go func() {
    resp, err := client.Pub(ctx, msgChan)
    if err != nil {
        log.Printf("Publish error: %v", err)
        return
    }
    log.Printf("Published %d messages\n", resp.MessagesReceived)
}()

// Send messages over time
for i := 0; i < 1000; i++ {
    msg := togomq.NewMessage("events", []byte(fmt.Sprintf("event-%d", i)))
    msgChan <- msg
    time.Sleep(10 * time.Millisecond)
}

// Close channel to signal end of stream
close(msgChan)
time.Sleep(2 * time.Second)

Message Options

Custom Metadata:

msg := togomq.NewMessage("orders", []byte("order-data")).
    WithVariables(map[string]string{
        "order_id":    "12345",
        "customer_id": "98765",
        "priority":    "high",
        "region":      "us-east",
    })

Delayed Messages:

// Delay 60 seconds
msg := togomq.NewMessage("reminders", []byte("Send email")).
    WithPostpone(60)

// Delay 1 hour
msg := togomq.NewMessage("scheduled-tasks", []byte("Run backup")).
    WithPostpone(3600)

Message Retention:

// Keep for 1 hour
msg := togomq.NewMessage("temporary-events", []byte("data")).
    WithRetention(3600)

Best Practices

1. Reuse Clients:

// Good ✅
client, _ := togomq.NewClient(config)
defer client.Close()

for i := 0; i < 1000; i++ {
    client.PubBatch(ctx, messages)
}

2. Use Batch Publishing:

// Good ✅
messages := []*togomq.Message{msg1, msg2, msg3}
client.PubBatch(ctx, messages)

3. Handle Errors:

resp, err := client.PubBatch(ctx, messages)
if err != nil {
    if togomqErr, ok := err.(*togomq.TogoMQError); ok {
        switch togomqErr.Code {
        case togomq.ErrCodeAuth:
            log.Println("Authentication failed")
        case togomq.ErrCodeConnection:
            log.Println("Connection error - retry")
        }
    }
}

4. Use Timeouts:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := client.PubBatch(ctx, messages)

Subscribing to Messages

Subscribe to messages published to specific topics using a simple channel-based API.

{danger} Important: Topic is required. Use wildcards like "orders.*" or "*" for all topics.

Received Message Structure

type Message struct {
    Topic     string            // Message topic
    UUID      string            // Unique message identifier
    Body      []byte            // Message payload
    Variables map[string]string // Custom metadata
}

Basic Subscription

// Subscribe to specific topic
opts := togomq.NewSubscribeOptions("orders")
msgChan, errChan, err := client.Sub(context.Background(), opts)
if err != nil {
    log.Fatal(err)
}

// Receive messages
for {
    select {
    case msg, ok := <-msgChan:
        if !ok {
            log.Println("Subscription ended")
            return
        }

        log.Printf("Received from %s: %s\n", msg.Topic, string(msg.Body))
        log.Printf("UUID: %s\n", msg.UUID)

        // Access custom variables
        if priority, ok := msg.Variables["priority"]; ok {
            log.Printf("Priority: %s\n", priority)
        }

    case err := <-errChan:
        log.Printf("Subscription error: %v\n", err)
        return
    }
}

Advanced Options

Batch Size - Control messages received at once:

opts := togomq.NewSubscribeOptions("orders").
    WithBatch(10)

Rate Limiting - Limit messages per second:

opts := togomq.NewSubscribeOptions("orders").
    WithSpeedPerSec(100)

Combined Options:

opts := togomq.NewSubscribeOptions("events").
    WithBatch(50).
    WithSpeedPerSec(200)

msgChan, errChan, err := client.Sub(context.Background(), opts)

Wildcard Subscriptions

All topics:

opts := togomq.NewSubscribeOptions("*")
msgChan, errChan, err := client.Sub(ctx, opts)

Pattern matching:

// All order topics (orders.new, orders.updated, etc.)
opts := togomq.NewSubscribeOptions("orders.*")

// All critical events
opts := togomq.NewSubscribeOptions("*.critical")

Context Cancellation

Timeout-based:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

opts := togomq.NewSubscribeOptions("events")
msgChan, errChan, err := client.Sub(ctx, opts)

for {
    select {
    case msg := <-msgChan:
        processMessage(msg)
    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        return
    case <-ctx.Done():
        log.Println("Subscription timeout")
        return
    }
}

Manual cancellation:

ctx, cancel := context.WithCancel(context.Background())

// Cancel after 100 messages
count := 0
for msg := range msgChan {
    processMessage(msg)
    count++
    if count >= 100 {
        cancel()
        return
    }
}

Concurrent Processing

// Create worker pool
workers := 10
workerChan := make(chan *togomq.Message, 100)

// Start workers
for i := 0; i < workers; i++ {
    go worker(workerChan)
}

// Distribute messages
for {
    select {
    case msg := <-msgChan:
        if msg == nil {
            close(workerChan)
            return
        }
        workerChan <- msg
    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        close(workerChan)
        return
    }
}

func worker(msgChan <-chan *togomq.Message) {
    for msg := range msgChan {
        processMessage(msg)
    }
}

Best Practices

1. Monitor Error Channel:

for {
    select {
    case msg := <-msgChan:
        processMessage(msg)
    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        return
    }
}

2. Handle Nil Messages:

case msg := <-msgChan:
    if msg == nil {
        return
    }
    processMessage(msg)

3. Graceful Shutdown:

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    <-sigChan
    cancel()
}()

msgChan, errChan, err := client.Sub(ctx, opts)

Error Handling

The SDK provides detailed error information for graceful failure handling and debugging.

Error Structure

type TogoMQError struct {
    Code    string // Error code for programmatic handling
    Message string // Human-readable error message
    Err     error  // Original underlying error (if any)
}

Type assertion:

resp, err := client.PubBatch(ctx, messages)
if err != nil {
    if togomqErr, ok := err.(*togomq.TogoMQError); ok {
        log.Printf("Code: %s, Message: %s\n", togomqErr.Code, togomqErr.Message)
    }
}

Error Codes

Error Code Description Common Causes
ErrCodeConnection Connection/network errors Network issues, firewall, DNS
ErrCodeAuth Authentication failures Invalid/expired/revoked token
ErrCodeValidation Invalid input/config Missing topic, invalid data
ErrCodePublish Publishing errors Server-side failures
ErrCodeSubscribe Subscription errors Server-side failures
ErrCodeStream Streaming errors Stream interrupted
ErrCodeConfiguration Config errors Invalid host/port

Handling Errors

Basic:

resp, err := client.PubBatch(ctx, messages)
if err != nil {
    log.Printf("Failed: %v\n", err)
    return
}

Detailed:

if togomqErr, ok := err.(*togomq.TogoMQError); ok {
    switch togomqErr.Code {
    case togomq.ErrCodeAuth:
        log.Println("Authentication failed - check token")
    case togomq.ErrCodeConnection:
        log.Println("Connection error - will retry")
    case togomq.ErrCodeValidation:
        log.Printf("Validation error: %s\n", togomqErr.Message)
    default:
        log.Printf("Error (%s): %s\n", togomqErr.Code, togomqErr.Message)
    }
}

Common Scenarios

Authentication errors:

if togomqErr.Code == togomq.ErrCodeAuth {
    log.Println("Authentication failed!")
    log.Println("- Token is invalid")
    log.Println("- Token has been revoked")
    log.Println("- Token has expired")
}

Connection errors with retry:

if togomqErr.Code == togomq.ErrCodeConnection {
    for attempt := 1; attempt <= 3; attempt++ {
        time.Sleep(time.Duration(attempt) * time.Second)
        resp, err = client.PubBatch(ctx, messages)
        if err == nil {
            break
        }
    }
}

Context timeout:

if err == context.DeadlineExceeded {
    log.Println("Operation timed out!")
}

Best Practices

1. Always Check Errors:

// Good ✅
resp, err := client.PubBatch(ctx, messages)
if err != nil {
    log.Printf("Error: %v\n", err)
    return
}

2. Implement Retry Logic:

func publishWithRetry(client *togomq.Client, messages []*togomq.Message, maxRetries int) error {
    for attempt := 0; attempt < maxRetries; attempt++ {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        _, err := client.PubBatch(ctx, messages)
        cancel()

        if err == nil {
            return nil
        }

        // Don't retry auth/validation errors
        if togomqErr, ok := err.(*togomq.TogoMQError); ok {
            if togomqErr.Code == togomq.ErrCodeAuth || 
               togomqErr.Code == togomq.ErrCodeValidation {
                return err
            }
        }

        // Exponential backoff
        if attempt < maxRetries-1 {
            backoff := time.Duration(1<<uint(attempt)) * time.Second
            time.Sleep(backoff)
        }
    }
    return err
}

3. Log Error Details:

if togomqErr, ok := err.(*togomq.TogoMQError); ok {
    log.Printf("Error - Code: %s, Message: %s, Underlying: %v\n",
        togomqErr.Code, togomqErr.Message, togomqErr.Err)
}

4. Monitor Error Rates:

var totalRequests, failedRequests int64

func publish(client *togomq.Client, messages []*togomq.Message) {
    atomic.AddInt64(&totalRequests, 1)

    resp, err := client.PubBatch(ctx, messages)
    if err != nil {
        atomic.AddInt64(&failedRequests, 1)
        errorRate := float64(failedRequests) / float64(totalRequests)
        if errorRate > 0.05 {
            log.Printf("WARNING: High error rate: %.2f%%\n", errorRate*100)
        }
    }
}

Best Practices

1. Reuse Clients

Create one client per application and reuse it:

// Good ✅
client, _ := togomq.NewClient(config)
defer client.Close()

for i := 0; i < 1000; i++ {
    client.PubBatch(ctx, messages)
}

2. Always Close Connections

client, err := togomq.NewClient(config)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Use client...

3. Handle Errors

resp, err := client.PubBatch(ctx, messages)
if err != nil {
    if togomqErr, ok := err.(*togomq.TogoMQError); ok {
        log.Printf("Error (%s): %s\n", togomqErr.Code, togomqErr.Message)
    }
    return
}
log.Printf("Published %d messages\n", resp.MessagesReceived)

4. Use Batch Publishing

// Good ✅
messages := []*togomq.Message{msg1, msg2, msg3}
client.PubBatch(ctx, messages)

5. Validate Topics

Always provide valid topic names:

// Good ✅
msg := togomq.NewMessage("orders", []byte("data"))

// Bad ❌
msg := togomq.NewMessage("", []byte("data")) // Empty topic will fail

6. Use Contexts with Timeouts

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := client.PubBatch(ctx, messages)

7. Set Log Levels

Control logging based on environment:

// Development
config := togomq.NewConfig(
    togomq.WithToken(token),
    togomq.WithLogLevel("debug"),
)

// Production
config := togomq.NewConfig(
    togomq.WithToken(token),
    togomq.WithLogLevel("error"),
)

Complete Examples

Publishing Example

package main

import (
    "context"
    "fmt"
    "log"
    "os"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create client
    config := togomq.NewConfig(
        togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
        togomq.WithLogLevel("debug"),
    )

    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Close()

    // Create messages
    messages := make([]*togomq.Message, 100)
    for i := 0; i < 100; i++ {
        messages[i] = togomq.NewMessage(
            "events",
            []byte(fmt.Sprintf("event-%d", i)),
        ).WithVariables(map[string]string{
            "index": fmt.Sprintf("%d", i),
            "type":  "test",
        })
    }

    // Publish batch
    ctx := context.Background()
    resp, err := client.PubBatch(ctx, messages)
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }

    log.Printf("Published %d messages successfully!\n", resp.MessagesReceived)
}

Subscribing Example

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Graceful shutdown handler
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    // Create client
    config := togomq.NewConfig(
        togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
    )

    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Close()

    // Create context with cancellation
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Subscribe with options
    opts := togomq.NewSubscribeOptions("orders.*").
        WithBatch(10).
        WithSpeedPerSec(100)

    msgChan, errChan, err := client.Sub(ctx, opts)
    if err != nil {
        log.Fatalf("Failed to subscribe: %v", err)
    }

    log.Println("Subscribed, waiting for messages...")

    for {
        select {
        case msg, ok := <-msgChan:
            if !ok {
                log.Println("Subscription ended")
                return
            }

            log.Printf("[%s] %s: %s\n", msg.UUID, msg.Topic, string(msg.Body))

            // Process message
            processMessage(msg)

        case err := <-errChan:
            log.Printf("Subscription error: %v\n", err)
            return

        case <-sigChan:
            log.Println("Shutting down gracefully...")
            cancel()
            return
        }
    }
}

func processMessage(msg *togomq.Message) {
    // Your business logic
}

Resources


{success} Check out the FAQ for common questions about Togo MQ.