You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							129 lines
						
					
					
						
							2.8 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							129 lines
						
					
					
						
							2.8 KiB
						
					
					
				
								package pubsub
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"strings"
							 | 
						|
									"sync"
							 | 
						|
									"time"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								type Message struct {
							 | 
						|
									Topic   string
							 | 
						|
									Payload []byte
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type (
							 | 
						|
									subscriber  chan Message      // 订阅者为一个管道
							 | 
						|
									topicFilter func(string) bool // 主题为一个过滤器
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// 发布者对象
							 | 
						|
								type Publisher struct {
							 | 
						|
									m           sync.RWMutex               // 读写锁
							 | 
						|
									buffer      int                        // 订阅队列的缓存大小
							 | 
						|
									timeout     time.Duration              // 发布超时时间
							 | 
						|
									subscribers map[subscriber]topicFilter // 所有订阅者信息
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// 构建一个发布者对象,可以设置发布超时时间和缓存队列的长度
							 | 
						|
								func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
							 | 
						|
									return &Publisher{
							 | 
						|
										buffer:      buffer,
							 | 
						|
										timeout:     publishTimeout,
							 | 
						|
										subscribers: make(map[subscriber]topicFilter),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// 添加一个新的订阅者,订阅过滤筛选后的主题
							 | 
						|
								func (p *Publisher) subscribeTopic(filter topicFilter) chan Message {
							 | 
						|
									ch := make(chan Message, p.buffer)
							 | 
						|
									p.m.Lock()
							 | 
						|
									p.subscribers[ch] = filter
							 | 
						|
									p.m.Unlock()
							 | 
						|
									return ch
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (p *Publisher) Subscribe(topic string) chan Message {
							 | 
						|
									var messages chan Message
							 | 
						|
									if topic != "" {
							 | 
						|
										if strings.Contains(topic, "*") {
							 | 
						|
											sps := strings.Split(topic, "*")
							 | 
						|
											if len(sps) >= 2 && len(sps[0]) > 0 && len(sps[1]) >= 0 {
							 | 
						|
												messages = p.subscribeTopic(func(t string) bool {
							 | 
						|
													flag := true
							 | 
						|
													for _, sp := range sps {
							 | 
						|
														if !strings.Contains(t, sp) {
							 | 
						|
															flag = false
							 | 
						|
															break
							 | 
						|
														}
							 | 
						|
													}
							 | 
						|
													return flag
							 | 
						|
												})
							 | 
						|
											} else {
							 | 
						|
												messages = p.subscribeTopic(nil)
							 | 
						|
											}
							 | 
						|
										} else {
							 | 
						|
											messages = p.subscribeTopic(func(t string) bool {
							 | 
						|
												if t == topic {
							 | 
						|
													return true
							 | 
						|
												}
							 | 
						|
												return false
							 | 
						|
											})
							 | 
						|
										}
							 | 
						|
									} else {
							 | 
						|
										messages = p.subscribeTopic(nil)
							 | 
						|
									}
							 | 
						|
									return messages
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// 退出订阅
							 | 
						|
								func (p *Publisher) Unsubscribe(sub chan Message) {
							 | 
						|
									p.m.Lock()
							 | 
						|
									defer p.m.Unlock()
							 | 
						|
									delete(p.subscribers, sub)
							 | 
						|
									close(sub)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// 发送主题,可以容忍一定的超时
							 | 
						|
								func (p *Publisher) sendTopic(sub subscriber, filter topicFilter, msg Message, wg *sync.WaitGroup) {
							 | 
						|
									defer wg.Done()
							 | 
						|
									// 没有订阅该主题
							 | 
						|
									if filter != nil && !filter(msg.Topic) {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
									select {
							 | 
						|
									case sub <- msg:
							 | 
						|
									case <-time.After(p.timeout):
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// 发布一个主题
							 | 
						|
								func (p *Publisher) Publish(topic string, payload []byte) {
							 | 
						|
									p.m.RLock()
							 | 
						|
									defer p.m.RUnlock()
							 | 
						|
									var wg sync.WaitGroup
							 | 
						|
									msg := Message{
							 | 
						|
										Topic:   topic,
							 | 
						|
										Payload: payload,
							 | 
						|
									}
							 | 
						|
									for sub, filter := range p.subscribers {
							 | 
						|
										wg.Add(1)
							 | 
						|
										go p.sendTopic(sub, filter, msg, &wg)
							 | 
						|
									}
							 | 
						|
									wg.Wait()
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// 关闭发布者对象,同时关闭所有的订阅者管道
							 | 
						|
								func (p *Publisher) Close() {
							 | 
						|
									p.m.Lock()
							 | 
						|
									defer p.m.Unlock()
							 | 
						|
									for sub := range p.subscribers {
							 | 
						|
										delete(p.subscribers, sub)
							 | 
						|
										close(sub)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								var PubSub *Publisher
							 | 
						|
								
							 | 
						|
								func init() {
							 | 
						|
									PubSub = NewPublisher(100*time.Millisecond, 1)
							 | 
						|
								}
							 | 
						|
								
							 |