使用channel实现发布订阅模式的核心在于维护订阅者列表并解耦发布者与订阅者。1. 通过map存储主题与订阅者channel的对应关系,实现订阅和取消订阅操作;2. 发布消息时遍历订阅者列表,并用goroutine发送以防止阻塞;3. 防止channel阻塞可采用带缓冲的channel、加锁控制或丢弃策略;4. 缓冲大小应根据发布与订阅速度差异选择,通常从保守值开始调整;5. 处理订阅者掉线可通过超时机制检测并移除无效channel,结合心跳检测提升可靠性;6. 若需保证顺序性,可通过单channel串行分发或为消息添加序列号排序实现。
发布订阅模式在Go语言中,利用channel可以优雅地实现。核心在于维护一个订阅者列表(通常是channel的slice或map),当事件发生时,将消息发送到所有订阅者的channel中。这种方式能有效解耦发布者和订阅者,构建灵活的事件驱动架构。
解决方案
package main import ( "fmt" "sync" "time" ) // EventData 事件数据结构 type EventData struct { Topic string Data interface{} } // Broker 发布者/代理 type Broker struct { mu sync.RWMutex subscribers map[string][]chan EventData quit chan bool } // NewBroker 创建一个新的 Broker 实例 func NewBroker() *Broker { return &Broker{ subscribers: make(map[string][]chan EventData), quit: make(chan bool), } } // Subscribe 订阅特定主题 func (b *Broker) Subscribe(topic string, ch chan EventData) { b.mu.Lock() defer b.mu.Unlock() b.subscribers[topic] = append(b.subscribers[topic], ch) } // Unsubscribe 取消订阅 func (b *Broker) Unsubscribe(topic string, ch chan EventData) { b.mu.Lock() defer b.mu.Unlock() if _, ok := b.subscribers[topic]; !ok { return } var newSubs []chan EventData for _, sub := range b.subscribers[topic] { if sub != ch { newSubs = append(newSubs, sub) } } b.subscribers[topic] = newSubs // 如果该主题没有订阅者,则删除该主题 if len(b.subscribers[topic]) == 0 { delete(b.subscribers, topic) } } // Publish 发布事件 func (b *Broker) Publish(event EventData) { b.mu.RLock() defer b.mu.RUnlock() if subs, ok := b.subscribers[event.Topic]; ok { for _, ch := range subs { // 使用 goroutine 防止阻塞发布者 go func(channel chan EventData) { select { case channel <- event: // 发送事件到channel default: fmt.Println("Channel full, dropping message") // 处理channel阻塞的情况 } }(ch) } } } // Start 启动 Broker func (b *Broker) Start() { <-b.quit // 等待退出信号 } // Stop 停止 Broker func (b *Broker) Stop() { close(b.quit) } func main() { broker := NewBroker() go broker.Start() defer broker.Stop() // 创建订阅者 subscriber1 := make(chan EventData, 10) // 带缓冲的channel subscriber2 := make(chan EventData, 10) // 订阅主题 broker.Subscribe("topicA", subscriber1) broker.Subscribe("topicB", subscriber2) // 发布事件 broker.Publish(EventData{Topic: "topicA", Data: "Message for topicA"}) broker.Publish(EventData{Topic: "topicB", Data: "Message for topicB"}) broker.Publish(EventData{Topic: "topicA", Data: "Another message for topicA"}) time.Sleep(time.Second) // 等待消息处理 // 从订阅者接收消息 select { case msg := <-subscriber1: fmt.Printf("Subscriber 1 received: %+vn", msg) default: fmt.Println("Subscriber 1 received nothing") } select { case msg := <-subscriber1: fmt.Printf("Subscriber 1 received: %+vn", msg) default: fmt.Println("Subscriber 1 received nothing") } select { case msg := <-subscriber2: fmt.Printf("Subscriber 2 received: %+vn", msg) default: fmt.Println("Subscriber 2 received nothing") } // 取消订阅 broker.Unsubscribe("topicA", subscriber1) // 再次发布事件,subscriber1 不应该再收到 broker.Publish(EventData{Topic: "topicA", Data: "This should not be received by subscriber1"}) time.Sleep(time.Second) select { case msg := <-subscriber1: fmt.Printf("Subscriber 1 received: %+vn", msg) default: fmt.Println("Subscriber 1 received nothing") // 预期输出 } }
使用channel实现发布订阅,最怕的就是channel阻塞。上面的代码里,我加了
select
语句,就是为了防止这种情况。如果channel满了,就直接丢弃消息,当然,更好的做法是加一些重试机制或者把消息持久化到队列里,但这会增加复杂度,看具体场景需求吧。
立即学习“go语言免费学习笔记(深入)”;
Golang channel的缓冲大小如何选择?
Channel的缓冲大小直接影响到发布订阅模式的性能和可靠性。太小容易阻塞,太大又浪费内存。一般来说,需要根据发布者和订阅者的速度差异来调整。如果发布速度远大于订阅速度,可以适当增加缓冲大小。另外,还可以考虑使用动态调整缓冲大小的策略,但这会引入额外的复杂性。我的经验是,先用一个相对保守的值,比如10或者100,然后根据实际运行情况调整。
如何处理订阅者掉线或崩溃的情况?
这是个很现实的问题。如果订阅者掉线了,broker继续往它的channel里发消息,会导致goroutine泄漏。解决这个问题,可以在
Publish
方法里加一个超时机制,如果一段时间内无法向channel发送消息,就认为订阅者已经掉线,然后从订阅者列表中移除。另外,订阅者在重新连接后,需要重新订阅。更健壮的方案,可以考虑使用心跳检测,定期检查订阅者是否存活。
如何保证消息的顺序性?
在发布订阅模式中,保证消息的顺序性是一个挑战。因为消息是并发发送到多个订阅者的,很难保证所有订阅者都按照相同的顺序接收到消息。如果对顺序性有严格要求,可以考虑使用单一的channel来发送消息,然后由一个goroutine负责将消息分发到各个订阅者。但这样做会降低并发度。另一种方法是为每个消息添加一个序列号,订阅者收到消息后,按照序列号进行排序。
评论(已关闭)
评论已关闭