Exploring the Pub-Sub Design System with Golang

Published on
Authors

Summary

A publisher-subscriber system is a design pattern in which a broker handles communication between its members. A publisher sends a message to a topic, and the broker ensures all subscribers to that topic receive it. Subscribers do not need to know about the publishers and vice versa. This blog post describes a basic implementation in Golang of the pub-sub system.

Why Pub-Sub?

An alternative is a request-response pattern, but this presents a problem in the case of distributed systems. Pub/Sub systems favor the decoupling of components and the ability to scale without having to update or overload the business logic part of the system.

Language agnostic development is another benefit. A publisher written in one language or framework can broadcast to multiple subscribers, irrespective of their programming language. The requirements are a valid topic and message body structured in a predefined format for any subscriber to consume the message.

Some valid use cases for a Pub/System are:

  • Chat applications
  • Distributed logging
  • Distributed caching
  • Event notification

Show me the code? See the full code in this gist: pub_sub.go

Playground? Try it out

Implementation

We will use code from this repo as development base. You can read about how I got here in my golang development setup blog post. This is its file structure:

├── Dockerfile
├── LICENSE
├── Readme.md
├── docker-compose.yml
├── go.mod
└── main.go

Subscriber

We'll begin by defining the subscriber struct. It will have a name and id for identification, and a slice of topics it is subscribed to.

type Subscriber struct {
  Name string
  ID string
  Topics []string
}

Next we implement a "receive" method that is called when a message is received. For our purposes, we'll just print the message and the current time.

func (c *Subscriber) Receive(message string) {
  fmt.Printf("[%s]%s received message: %s\n", time.Now().Format("2006/01/02 15:04:05"), c.Name, message)
}

Broker

Like I described earlier, the broker handles receipt from publishers and transmission of messages to subscribers. To be able to do this, it should have a mapping of topics to subscribers. This is described in the Broker struct.

type Broker struct {
  Subscribers map[int]*Subscriber
  TopicMap map[string][]int
}

// NewBroker creates a new broker instance with empty subscriberId-subscriber and topic-subscriberId slice maps.
// It receives a slice of topics to be initialized with and returns a pointer to the new broker.
func NewBroker(topics []string) *Broker {
	subscribers := make(map[int]*Subscriber)
	topicMap := make(map[string][]int)

	for _, topic := range topics {
		var a []int
		topicMap[topic] = a
	}
	return &Broker{TopicMap: topicMap, Subscribers: subscribers}
}

We defined the Broker struct and a function to create a new broker instance in the preceding code block. The interesting part is that upon initialization, a map of topics is created, with each topic mapped to an empty slice which would contain the ids of subscribers listening to that topic.

Next let's write a register method that takes a subscriber and registers or subscribes it to the various topics in which it is interested. The register function also adds the subscriber to the subscriberId-subscriber map.

func (b *Broker) Register(s Subscriber) {
	// Add the subscriber to the subscriberId-subscriber map
	b.Subscribers[s.ID] = &s

	// Subscribe the subscriber to each topic
	for _, topic := range s.Topics {
		b.TopicMap[topic] = append(b.TopicMap[topic], s.ID)
	}
}

Finally for the broker, we would need a function to broadcast messages to a topic. So let's write that!

func (b *Broker) Broadcast(topic string, message string) {
	// Get the list of subscribers subscribed to the topic
	subscriberIds := b.TopicMap[topic]

	// Send the message to each subscriber
	for _, subscriberId := range subscriberIds {
		b.Subscribers[subscriberId].Receive(message)
	}
}

Publisher

To test our setup, we will define a publisher struct which has one member a broker and has a publish method that receives a topic and message to be sent via the broker.

type Publisher struct {
	broker *Broker
}

func (p *Publisher) Publish(topic string, message string) {
	// Send the message to each topic
	p.broker.Broadcast(topic, fmt.Sprintf("Topic: %s, Message: %s", topic, message))
}

Working Example

Now we have the bits of our pub/sub system in place, let's create a news publishing system where several feeds are subscribed to various categories/topics of news. I decided to use the following topics: sports, politics, religion, art. Firstly, let's store these topics in a slice surprisingly called topics, and manually create a slice containing three subscribers.

  // slice of topics we want to use
  topics := []string{"sports", "politics", "religion", "art"}

	// manually instantiate subscribers
	subscribers := []Subscriber{
		{
			Name:   "Subscriber0",
			ID:     0,
			Topics: []string{"sports", "politics"},
		},
		{
			Name:   "Subscriber1",
			ID:     1,
			Topics: []string{"religion", "politics", "sports"},
		},
		{
			Name:   "Subscriber2",
			ID:     2,
			Topics: []string{"art"},
		},
	}

Next step is to create a newsBroker and register our subscribers.

	// create a new broker and register subscribers
	newsBroker := NewBroker(topics)
	for _, subscriber := range subscribers {
		newsBroker.Register(subscriber)
	}

We need a publisher to create the news and for that we will instantiate a new publisher. The news will be a random string of characters, and I got a handy function, randomString off SO for that.

	// create a new publisher
	newsPublisher := Publisher{broker: newsBroker}

  // https://stackoverflow.com/a/65607935/12227177
  func randomString(length int) string {
    rand.Seed(time.Now().UnixNano())
    b := make([]byte, length)
    rand.Read(b)
    return fmt.Sprintf("%x", b)[:length]
  }

To simulate periodic news generation, we will loop through the slice of topics, generate a random string and broadcast this to the topic in a go routine.

  // create tickers for each topic
  for i, topic := range topics {
      go func(topic string, idx int) {
        for {
            ticker := time.NewTicker(time.Duration(idx*5) * time.Second)
            for _ = range ticker.C {
              newsPublisher.Publish(topic, randomString(10))
            }
          }
      }(topic, i+1)
    }

What we are doing above is creating a ticker for each topic, and then sending a random string to the topic at a staggered interval based on the index of the topic. So the first topic 'sports' will be sent every 5 seconds, the second every 10 seconds, the third every 15 seconds, and so on.

But how do we know when to stop? We can use a context to send a signal to the goroutine when we want to stop. Modifying the code in the block above:

 ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

  // create tickers for each topic
	for i, topic := range topics {
		go func(ctx context.Context, topic string, idx int) {
			for {
				select {
				case <-ctx.Done():
					return
				default:
					ticker := time.NewTicker(time.Duration(idx*5) * time.Second)
					for _ = range ticker.C {
						newsPublisher.Publish(topic, randomString(10))
					}
				}
			}
		}(ctx, topic, i+1)
	}

	time.Sleep(time.Minute * 1)
	cancel() // Stop goroutine
	fmt.Println("Quitting")

You can find the playground here: https://go.dev/play/p/NxIUF7RTCyt

Result

Running the program for 1 minute yields the following printout:

[2022/07/15 22:48:24]Subscriber0 received message: Topic: sports, Message: 92050a2f81
[2022/07/15 22:48:24]Subscriber1 received message: Topic: sports, Message: 92050a2f81

[2022/07/15 22:48:29]Subscriber0 received message: Topic: politics, Message: 35a30f9c34
[2022/07/15 22:48:29]Subscriber1 received message: Topic: politics, Message: 35a30f9c34

[2022/07/15 22:48:29]Subscriber0 received message: Topic: sports, Message: 663d5ede81
[2022/07/15 22:48:29]Subscriber1 received message: Topic: sports, Message: 663d5ede81

[2022/07/15 22:48:34]Subscriber0 received message: Topic: sports, Message: 26816a9ee5
[2022/07/15 22:48:34]Subscriber1 received message: Topic: sports, Message: 26816a9ee5
[2022/07/15 22:48:34]Subscriber1 received message: Topic: religion, Message: b93da27cb9

[2022/07/15 22:48:39]Subscriber0 received message: Topic: politics, Message: 6f9202ae1b
[2022/07/15 22:48:39]Subscriber1 received message: Topic: politics, Message: 6f9202ae1b

[2022/07/15 22:48:39]Subscriber2 received message: Topic: art, Message: 0b77423277

[2022/07/15 22:48:39]Subscriber0 received message: Topic: sports, Message: 85a87ef98d
[2022/07/15 22:48:39]Subscriber1 received message: Topic: sports, Message: 85a87ef98d

[2022/07/15 22:48:44]Subscriber0 received message: Topic: sports, Message: 262a1871bd
[2022/07/15 22:48:44]Subscriber1 received message: Topic: sports, Message: 262a1871bd

[2022/07/15 22:48:49]Subscriber0 received message: Topic: sports, Message: e13a8690d7
[2022/07/15 22:48:49]Subscriber1 received message: Topic: sports, Message: e13a8690d7
[2022/07/15 22:48:49]Subscriber1 received message: Topic: religion, Message: 262e86d835
Quitting