婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識(shí)庫 > 關(guān)于redigo中PubSub的一點(diǎn)小坑分析

關(guān)于redigo中PubSub的一點(diǎn)小坑分析

熱門標(biāo)簽:百度商家地圖標(biāo)注怎么做 地圖標(biāo)注如何即時(shí)生效 地圖標(biāo)注費(fèi)用 最簡單的百度地圖標(biāo)注 太原營銷外呼系統(tǒng) 玄武湖地圖標(biāo)注 西藏教育智能外呼系統(tǒng)價(jià)格 小紅書怎么地圖標(biāo)注店 竹間科技AI電銷機(jī)器人

前言

最近在用 golang 做一些 redis 相關(guān)的操作,選用了 redigo 這個(gè)第三方庫。然后在使用 Pub/Sub 的時(shí)候,卻發(fā)現(xiàn)了一個(gè)小坑……

Redis Client

首先,我們來初始化一個(gè)帶連接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)

type RedisClient struct {
	pool *redis.Pool
}

func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := redis.Pool{
		MaxIdle:  10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t)  time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我們可以簡單的實(shí)現(xiàn)一個(gè) publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下來就是一個(gè)稍微復(fù)雜點(diǎn)的帶有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done - fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done - err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done - nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case -ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := -done:
			return err
		case -tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

最后,我們寫一個(gè)簡單地 main 函數(shù)來調(diào)用 publish subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done - fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done - err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done - nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case -ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := -done:
			return err
		case -tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}


咋一看之下,好像并沒有什么異常?然而,如果我們這時(shí)候去看 redis 的 tcp 連接,就可以發(fā)現(xiàn)一些貓膩:

$sudo netstat -antp | grep redis
tcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一個(gè)連接,而 connection pool 似乎沒有什么作用。

更進(jìn)一步地調(diào)試,我們發(fā)現(xiàn)在 defer psc.Close() 的時(shí)候就卡住了,也就是上面的 10 個(gè) goroutine 其實(shí)并沒有正常退出。

Concurrent

排查許久之后,終于定位到了問題!引用 redigo 的說明:

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是說,雖然一個(gè)連接可以在不同的 goroutine 并發(fā)調(diào)用 Receive() 和 Subscribe()(subscribe調(diào)用了send和flush) ,但是卻不能再有其他并發(fā)操作(比如 Close())。

其他相似的問題還可以參考 issue

Fix

知道了上面的原因之后,我們稍微修改一下 defer psc.Close() 的位置即可解決問題:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

您可能感興趣的文章:
  • go實(shí)現(xiàn)redigo的簡單操作

標(biāo)簽:廣東 香港 唐山 贛州 澳門 林芝 揚(yáng)州 景德鎮(zhèn)

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《關(guān)于redigo中PubSub的一點(diǎn)小坑分析》,本文關(guān)鍵詞  關(guān)于,redigo,中,PubSub,的,一點(diǎn),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《關(guān)于redigo中PubSub的一點(diǎn)小坑分析》相關(guān)的同類信息!
  • 本頁收集關(guān)于關(guān)于redigo中PubSub的一點(diǎn)小坑分析的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    蜜臀久久99精品久久久久宅男| 国产喂奶挤奶一区二区三区| 免费观看在线色综合| 国产精品网站导航| 99re8在线精品视频免费播放| 亚洲美女在线国产| 欧美军同video69gay| 欧美精三区欧美精三区| 成人app软件下载大全免费| 国产精品一卡二| 国产乱码精品一区二区三区av| 国产精品激情偷乱一区二区∴| 色婷婷亚洲一区二区三区| 精品欧美一区二区在线观看| 天天av天天翘天天综合网 | 日韩av中文在线观看| 亚洲精品伦理在线| 国产欧美一二三区| 激情六月婷婷综合| 136国产福利精品导航| 日韩欧美一区电影| 日本乱人伦一区| 精品一区二区三区在线视频| 五月婷婷久久综合| 久久精品亚洲乱码伦伦中文| 国产无一区二区| 亚洲综合男人的天堂| 蜜臀精品一区二区三区在线观看| 亚洲一区在线视频观看| 欧美变态tickling挠脚心| 2020日本不卡一区二区视频| 国产99久久久国产精品| 精品一区二区三区视频| 欧美男同性恋视频网站| 欧美精品一级二级| 91色在线porny| 精品久久久久99| 欧美美女视频在线观看| 精品国产免费久久| 日韩美女一区二区三区| 91女神在线视频| 国产精品久久久久久久久免费相片| 国产毛片精品国产一区二区三区| 91精品啪在线观看国产60岁| 亚洲黄一区二区三区| 成人黄色国产精品网站大全在线免费观看| 精品女同一区二区| 天堂成人免费av电影一区| 亚洲成人精品一区二区| 日韩一区二区在线观看视频播放| 亚洲最新视频在线观看| 久久99国内精品| 日韩一级成人av| 五月综合激情婷婷六月色窝| 国产日本欧美一区二区| 精品一区二区综合| 精品欧美一区二区久久| 大尺度一区二区| 欧美三级日韩在线| 中文字幕日韩一区二区| 美女精品一区二区| 欧美人牲a欧美精品| 日本伦理一区二区| 精品一区二区三区在线播放| 欧美亚洲一区三区| 国产a视频精品免费观看| 欧美一个色资源| 国产综合成人久久大片91| 久久九九全国免费| 亚洲18影院在线观看| 久久伊人中文字幕| 日韩激情av在线| 国产综合色产在线精品| 欧洲色大大久久| 水野朝阳av一区二区三区| 色久综合一二码| 三级久久三级久久| 99视频一区二区| 色综合久久久久综合体桃花网| 色综合亚洲欧洲| 成人激情免费视频| 91美女在线看| 色视频欧美一区二区三区| 色综合天天综合网天天看片| 精品日韩欧美一区二区| 在线影院国内精品| 不卡一区二区在线| 欧美无乱码久久久免费午夜一区 | 久国产精品韩国三级视频| 一区二区三区免费观看| 亚洲超碰精品一区二区| 日本精品视频一区二区三区| 69成人精品免费视频| 中文字幕免费不卡| 精品无人码麻豆乱码1区2区 | 久久久99免费| 国产精品少妇自拍| 99久久久久久99| 色综合天天综合网国产成人综合天| 美国十次了思思久久精品导航| 久久99久久久久久久久久久| 欧美一级久久久| 国产欧美日韩精品一区| 午夜在线成人av| 韩国一区二区在线观看| 亚洲国产日韩在线一区模特| 夜夜嗨av一区二区三区| 日韩中文字幕一区二区三区| 激情久久五月天| 国产精品日产欧美久久久久| 欧美韩国日本不卡| 欧美日韩一区二区在线观看视频| 亚洲日本va午夜在线影院| 奇米影视7777精品一区二区| 久久精品久久99精品久久| 欧美日韩免费高清一区色橹橹| 亚洲精选视频在线| 国产成人午夜高潮毛片| 欧美亚洲一区二区在线| 欧美人与性动xxxx| 亚洲成av人片在线观看无码| 欧美美女网站色| 成人一区二区在线观看| 亚洲蜜臀av乱码久久精品蜜桃| 91精品国产麻豆国产自产在线 | 国产视频一区不卡| 国产精品亚洲视频| 91亚洲精华国产精华精华液| 亚洲宅男天堂在线观看无病毒| 激情五月激情综合网| 2020国产精品自拍| 在线看国产一区| 成人av在线观| 色妞www精品视频| 国产亚洲综合在线| av在线一区二区三区| 欧美亚洲另类激情小说| 欧美日韩中字一区| 亚洲欧美另类久久久精品| 91麻豆国产福利在线观看| 免费不卡在线观看| 亚洲国产一区视频| 国产精品美女久久久久aⅴ国产馆 国产精品美女久久久久av爽李琼 国产精品美女久久久久高潮 | 精品国产91久久久久久久妲己| 欧美日韩国产精品成人| 欧美一级高清片在线观看| 国产成人免费视频网站高清观看视频| 国内久久精品视频| 7777精品伊人久久久大香线蕉的| 久久精品国产第一区二区三区| 色欧美片视频在线观看| 激情综合色播激情啊| 精品国产髙清在线看国产毛片 | 欧美一区二区三区爱爱| 一本色道综合亚洲| 蜜臀久久久久久久| 在线亚洲精品福利网址导航| 日本三级亚洲精品| 亚洲综合激情小说| 色偷偷成人一区二区三区91| 天天综合网 天天综合色| 欧美图片一区二区三区| 国产欧美精品国产国产专区| 欧美人xxxx| 91精品在线免费观看| 国产精品一区二区在线播放 | 色综合久久久网| 久久99在线观看| gogo大胆日本视频一区| 欧美在线啊v一区| 国产精品美女久久久久久久久| 欧美一区二区久久久| 免播放器亚洲一区| 国产九色精品成人porny| 大白屁股一区二区视频| 美女一区二区视频| 国产日本亚洲高清| 亚洲桃色在线一区| 亚洲国产精品精华液网站| 亚洲一区二区在线视频| 韩国女主播成人在线观看| 蜜桃av一区二区| 久久久亚洲高清| 国产毛片一区二区| 久久久久久久久99精品| 国产精品国产精品国产专区不片| 久久久五月婷婷| 日韩免费在线观看| 久久99精品一区二区三区| 男女视频一区二区| 午夜精品久久久久久久久久| 精品一区二区在线免费观看| 久久av老司机精品网站导航| 久久99精品久久久| 欧洲亚洲国产日韩| 久久人人97超碰com| 国产欧美日韩不卡| 日韩精品一级二级| 欧美精品乱人伦久久久久久| 亚洲精品国产成人久久av盗摄|