婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av

主頁 > 知識庫 > 詳解Redis用鏈表實現消息隊列

詳解Redis用鏈表實現消息隊列

熱門標簽:小紅書怎么地圖標注店 百度商家地圖標注怎么做 太原營銷外呼系統 玄武湖地圖標注 竹間科技AI電銷機器人 西藏教育智能外呼系統價格 地圖標注費用 地圖標注如何即時生效 最簡單的百度地圖標注

前言

Redis鏈表經常會被用于消息隊列的服務,以完成多程序之間的消息交換。個人認為redis消息隊列有一個好處,就是可以實現分布式和共享,就和memcache作為mysql的緩存和mysql自帶的緩存一樣。

鏈表實現消息隊列

Redis鏈表支持前后插入以及前后取出,所以如果往尾部插入元素,往頭部取出元素,這就是一種消息隊列,也可以說是消費者/生產者模型。可以利用lpush和rpop來實現。但是有一個問題,如果鏈表中沒有數據,那么消費者將要在while循環中調用rpop,這樣以來就浪費cpu資源,好在Redis提供一種阻塞版pop命令brpop或者blpop,用法為brpop/blpop list timeout, 當鏈表為空的時候,brpop/blpop將阻塞,直到設置超時時間到或者list插入一個元素。

用法如下:

charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
//阻塞在這里
/* ---------------------------------------------------- */
//當我在另一個客戶端lpush一個元素之后,客戶端輸出為
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)//阻塞的時間

當鏈表為空的時候,brpop是阻塞的,等待超時時間到或者另一個客戶端lpush一個元素。接下來,看下源碼是如何實現阻塞brpop命令的。要實現客戶端阻塞,只需要服務器不給客戶端發送消息,那么客戶端就會阻塞在read調用中,等待消息到達。這是很好實現的,關鍵是如何判斷這個客戶端阻塞的鏈表有數據到達以及通知客戶端解除阻塞?Redis的做法是,將阻塞的鍵以及阻塞在這個鍵上的客戶端鏈表存儲在一個字典中,然后每當向數據庫插入一個鏈表時,就判斷這個新插入的鏈表是否有客戶端阻塞,有的話,就解除這個阻塞的客戶端,并且發送剛插入鏈表元素給客戶端,客戶端就這樣解除阻塞。

先看下有關數據結構,以及server和client有關屬性

//阻塞狀態
typedef struct blockingState {
 /* Generic fields. */
 mstime_t timeout;  /* 超時時間 */
 /* REDIS_BLOCK_LIST */
 dict *keys;    /* The keys we are waiting to terminate a blocking
        * operation such as BLPOP. Otherwise NULL. */
 robj *target;   /* The key that should receive the element,
        * for BRPOPLPUSH. */
 /* REDIS_BLOCK_WAIT */
 int numreplicas;  /* Number of replicas we are waiting for ACK. */
 long long reploffset; /* Replication offset to reach. */
} blockingState;
//繼續列表
typedef struct readyList {
 redisDb *db;//就緒鍵所在的數據庫
 robj *key;//就緒鍵
} readyList;
//客戶端有關屬性
typedef struct redisClient {
 int btype;    /* Type of blocking op if REDIS_BLOCKED. */
 blockingState bpop;  /* blocking state */
}
//服務器有關屬性
struct redisServer {
  /* Blocked clients */
 unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
 list *unblocked_clients; /* list of clients to unblock before next loop */
 list *ready_keys;  /* List of readyList structures for BLPOP  co */
}
//數據庫有關屬性
typedef struct redisDb {
  //keys->redisCLient映射
  dict *blocking_keys;  /* Keys with clients waiting for data (BLPOP) */
 dict *ready_keys;   /* Blocked keys that received a PUSH */
}redisDB

必須對上述的數據結構足夠了解,否則很難看懂下面的代碼,因為這些代碼需要操作上述的數據結構。先從brpop命令執行函數開始分析,brpop命令執行函數為

void brpopCommand(redisClient *c) {
 blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 mstime_t timeout;
 int j;
 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],timeout,UNIT_SECONDS)
  != REDIS_OK) return;//將超時時間保存在timeout中
 for (j = 1; j  c->argc-1; j++) {
  o = lookupKeyWrite(c->db,c->argv[j]);//在數據庫中查找操作的鏈表
  if (o != NULL) {//如果不為空
   if (o->type != REDIS_LIST) {//不是鏈表類型
    addReply(c,shared.wrongtypeerr);//報錯
    return;
   } else {
    if (listTypeLength(o) != 0) {//鏈表不為空
     /* Non empty list, this is like a non normal [LR]POP. */
     char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
     robj *value = listTypePop(o,where);//從鏈表中pop出一個元素
     redisAssert(value != NULL);
     //給客戶端發送pop出來的元素信息
     addReplyMultiBulkLen(c,2);
     addReplyBulk(c,c->argv[j]);
     addReplyBulk(c,value);
     decrRefCount(value);
     notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
          c->argv[j],c->db->id);
     if (listTypeLength(o) == 0) {//如果鏈表為空,從數據庫刪除鏈表
      dbDelete(c->db,c->argv[j]);
      notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
           c->argv[j],c->db->id);
     }
     /* 省略一部分 */
    }
   }
  }
 }
  /* 如果鏈表為空,則阻塞客戶端 */
  blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

從源碼可以看出,brpop可以操作多個鏈表變量,例如brpop list1 list2 0,但是只能輸出第一個有元素的鏈表。如果list1沒有元素,而list2有元素,則輸出list2的元素;如果兩個都有元素,則輸出list1的元素;如果都沒有元素,則等待其中某個鏈表插入一個元素,之后在2返回。最后調用blockForyKeys阻塞

void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
 dictEntry *de;
 list *l;
 int j;
 c->bpop.timeout = timeout;//超時時間賦值給客戶端blockingState屬性
 c->bpop.target = target;//這屬性適用于brpoplpush命令的輸入對象,如果是brpop, //則target為空
 if (target != NULL) incrRefCount(target);//不為空,增加引用計數
 for (j = 0; j  numkeys; j++) {
  /* 將阻塞的key存入c.bpop.keys字典中 */
  if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  incrRefCount(keys[j]);
  /* And in the other "side", to map keys -> clients */
  //將阻塞的key和客戶端添加進c->db->blocking_keys
  de = dictFind(c->db->blocking_keys,keys[j]);
  if (de == NULL) {
   int retval;
   /* For every key we take a list of clients blocked for it */
   l = listCreate();
   retval = dictAdd(c->db->blocking_keys,keys[j],l);
   incrRefCount(keys[j]);
   redisAssertWithInfo(c,keys[j],retval == DICT_OK);
  } else {
   l = dictGetVal(de);
  }
  listAddNodeTail(l,c);//添加到阻塞鍵的客戶點鏈表中
 }
 blockClient(c,REDIS_BLOCKED_LIST);//設置客戶端阻塞標志
}

blockClient函數只是簡單的設置客戶端屬性,如下

void blockClient(redisClient *c, int btype) {
 c->flags |= REDIS_BLOCKED;//設置標志
 c->btype = btype;//阻塞操作類型
 server.bpop_blocked_clients++;
}

由于這個函數之后,brpop命令執行函數就結束了,由于沒有給客戶端發送消息,所以客戶端就阻塞在read調用中。那么如何解開客戶端的阻塞了?

插入一個元素解阻塞

任何指令的執行函數都是在processCommand函數中調用call函數,然后在call函數中調用命令執行函數,lpush也一樣。當執行完lpush之后,此時鏈表不為空,回到processCommand調用中,執行以下語句

if (listLength(server.ready_keys))
   handleClientsBlockedOnLists();

這兩行代碼是先檢查server.ready_keys是否為空,如果不為空,說明已經有一些就緒的鏈表,此時可以判斷是否有客戶端阻塞在這個鍵值上,如果有,則喚醒;現在問題又來了,這個server.ready_keys在哪更新鏈表了?

原來是在dbAdd函數中,當往數據庫中添加的值類型為REDIS-LIST時,這時就要調用signalListAsReady函數將鏈表指針添加進server.ready_keys:

//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
 sds copy = sdsdup(key->ptr);
 int retval = dictAdd(db->dict, copy, val);//將數據添加進數據庫
 redisAssertWithInfo(NULL,key,retval == REDIS_OK);
 //判斷是否為鏈表類型,如果是,調用有鏈表已經ready函數
 if (val->type == REDIS_LIST) signalListAsReady(db, key);
 if (server.cluster_enabled) slotToKeyAdd(key);
 }
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
 readyList *rl;
 /* 沒有客戶端阻塞在這個鍵上,則直接返回. */
 if (dictFind(db->blocking_keys,key) == NULL) return;
 /* 這個鍵已近被喚醒了,所以沒必要重新入隊 */
 if (dictFind(db->ready_keys,key) != NULL) return;
 /* Ok, 除了上述兩情況,把這個鍵放入server.ready_keys */
 rl = zmalloc(sizeof(*rl));
 rl->key = key;
 rl->db = db;
 incrRefCount(key);
 listAddNodeTail(server.ready_keys,rl);//添加鏈表末尾
 /* We also add the key in the db->ready_keys dictionary in order
  * to avoid adding it multiple times into a list with a simple O(1)
  * check. */
 incrRefCount(key);
 //同時將這個阻塞鍵放入db->ready_keys
 redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

OK,這時server.ready_keys上已經有就緒鍵了,這時就調用processCommand函數中的handleClientsBlockedOnLists()函數來處理阻塞客戶端,在這個函數中,

void handleClientsBlockedOnLists(void) {
 while(listLength(server.ready_keys) != 0) {
  list *l;
  /* 將server.ready_keys賦給一個新的list,再將server.ready_keys清空 */
  l = server.ready_keys;
  server.ready_keys = listCreate();
  /* 迭代每一個就緒的每一個readyList */
  while(listLength(l) != 0) {
   listNode *ln = listFirst(l);//獲取第一個就緒readyList
   readyList *rl = ln->value;
   /* 從rl所屬的數據庫中刪除rl */
   dictDelete(rl->db->ready_keys,rl->key);
   /* 查詢rl所屬的數據庫查找rl->key ,給阻塞客戶端回復rl->key鏈表中的第一個元素*/
   robj *o = lookupKeyWrite(rl->db,rl->key);
   if (o != NULL  o->type == REDIS_LIST) {
    dictEntry *de;
    /* 在rl->db->blocking_keys查找阻塞在rl->key的客戶端鏈表 */
    de = dictFind(rl->db->blocking_keys,rl->key);
    if (de) {
     list *clients = dictGetVal(de);//轉換為客戶端鏈表
     int numclients = listLength(clients);
     while(numclients--) {//給每個客戶端發送消息
      listNode *clientnode = listFirst(clients);
      redisClient *receiver = clientnode->value;//阻塞的客戶端
      robj *dstkey = receiver->bpop.target;//brpoplpush命令目的鏈表
      int where = (receiver->lastcmd 
          receiver->lastcmd->proc == blpopCommand) ?
         REDIS_HEAD : REDIS_TAIL;//獲取取出的方向
      robj *value = listTypePop(o,where);//取出就緒鏈表的元素
      if (value) {
       /* Protect receiver->bpop.target, that will be
        * freed by the next unblockClient()
        * call. */
       if (dstkey) incrRefCount(dstkey);
       unblockClient(receiver);//設置客戶端為非阻塞狀態
       if (serveClientBlockedOnList(receiver,
        rl->key,dstkey,rl->db,value,
        where) == REDIS_ERR)
       {
        /* If we failed serving the client we need
         * to also undo the POP operation. */
         listTypePush(o,value,where);
       }//給客戶端回復鏈表中的元素內容
       if (dstkey) decrRefCount(dstkey);
       decrRefCount(value);
      } else {
       break;
      }
     }
    }
    //如果鏈表為空,則從數據庫中刪除
    if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
    /* We don't call signalModifiedKey() as it was already called
     * when an element was pushed on the list. */
   }
   /* 回收rl */
   decrRefCount(rl->key);
   zfree(rl);
   listDelNode(l,ln);
  }
  listRelease(l); /* We have the new list on place at this point. */
 }
}

從這個源碼可知,如果有兩個客戶端,同時阻塞在一個鏈表上面,那么如果鏈表插入一個元素之后,只有先阻塞的那個客戶端收到消息,后面阻塞的那個客戶端繼續阻塞,這也是先阻塞先服務的思想。handleClientsBlockedOnLists函數調用了unblockClient(receiver) ,該函數功能為接觸客戶端阻塞標志,以及找到db阻塞在key上的客戶端鏈表,并將接觸阻塞的客戶端從鏈表刪除。然后調用serveClientBlockOnList給客戶端回復剛在鏈表插入的元素。

int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
 robj *argv[3];
 if (dstkey == NULL) {
  /* Propagate the [LR]POP operation. */
  argv[0] = (where == REDIS_HEAD) ? shared.lpop :
           shared.rpop;
  argv[1] = key;
  propagate((where == REDIS_HEAD) ?
   server.lpopCommand : server.rpopCommand,
   db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
  /* BRPOP/BLPOP */
  addReplyMultiBulkLen(receiver,2);
  addReplyBulk(receiver,key);
  addReplyBulk(receiver,value);
 } else {
  /* BRPOPLPUSH */
   /* 省略 */
 }
}

propagate函數主要是將命令信息發送給aof和slave。函數中省略部分是brpoplpush list list1 0命令的目的鏈表list1非空時,將從list鏈表pop出來的元素插入list1中。當給客戶端發送消息之后,客戶端就從read函數調用中返回,變為不阻塞。

通過超時時間解阻塞

如果鏈表一直沒有數據插入,那么客戶端將會一直阻塞下去,這肯定是不行的,所以brpop還支持超時阻塞,即阻塞時間超過一定值之后,服務器返回一個空值,這樣客戶端就解脫阻塞了。

對于時間超時,都放在了100ms執行一次的時間事件中;超時解脫阻塞函數也在serverCron中;在serverCron->clientsCron->clientsCronHandleTimeout

int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
 time_t now = now_ms/1000;
 //..........
 else if (c->flags  REDIS_BLOCKED) {
  /* Blocked OPS timeout is handled with milliseconds resolution.
   * However note that the actual resolution is limited by
   * server.hz. */
  if (c->bpop.timeout != 0  c->bpop.timeout  now_ms) {
   /* Handle blocking operation specific timeout. */
   replyToBlockedClientTimedOut(c);
   unblockClient(c);
  }
 }
 //.............

把這個函數不相干的代碼刪除,主要部分先判斷這個客戶端是否阻塞,如果是,超時時間是否到期,如果是,則調用replyToBlockedClientTimedOut給客戶端回復一個空回復,以及接觸客戶端阻塞。

總結

鏈表消息隊列實現暫時分析到這了,大家都學會了嗎?希望這篇文章給大家能帶來一定的幫助,如果有疑問可以留言交流。

您可能感興趣的文章:
  • SpringBoot利用redis集成消息隊列的方法
  • PHP使用php-resque庫配合Redis實現MQ消息隊列的教程
  • Java利用Redis實現消息隊列的示例代碼
  • phpredis提高消息隊列的實時性方法(推薦)
  • PHP基于Redis消息隊列實現發布微博的方法
  • php+redis消息隊列實現搶購功能
  • 深入理解redis分布式鎖和消息隊列
  • 詳解redis是如何實現隊列消息的ack
  • PHP+Redis 消息隊列 實現高并發下注冊人數統計的實例
  • redis中隊列消息實現應用解耦的方法

標簽:香港 澳門 贛州 唐山 林芝 景德鎮 廣東 揚州

巨人網絡通訊聲明:本文標題《詳解Redis用鏈表實現消息隊列》,本文關鍵詞  詳解,Redis,用鏈,表,實現,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《詳解Redis用鏈表實現消息隊列》相關的同類信息!
  • 本頁收集關于詳解Redis用鏈表實現消息隊列的相關信息資訊供網民參考!
  • 推薦文章
    婷婷综合国产,91蜜桃婷婷狠狠久久综合9色 ,九九九九九精品,国产综合av
    日韩一级二级三级| 久久激五月天综合精品| 三级欧美韩日大片在线看| 精品国产人成亚洲区| 国产成人久久精品77777最新版本 国产成人鲁色资源国产91色综 | 久久综合九色综合欧美就去吻 | 亚洲第一狼人社区| 久久综合成人精品亚洲另类欧美| 粗大黑人巨茎大战欧美成人| 亚洲欧洲精品天堂一级| 欧美高清激情brazzers| 国产福利一区二区三区| 亚洲一区二区三区在线| 中文字幕第一区二区| 日韩一级视频免费观看在线| 成人午夜电影小说| 国产一区二区三区美女| 亚洲线精品一区二区三区八戒| 国产日韩影视精品| 亚洲精品一区二区三区蜜桃下载| 69久久夜色精品国产69蝌蚪网| 色先锋久久av资源部| 成人综合婷婷国产精品久久蜜臀| 蜜臀久久99精品久久久画质超高清| 免费观看成人av| 91超碰这里只有精品国产| 欧美精品一区二区久久婷婷| 日韩一区日韩二区| 日韩一本二本av| 日韩一区二区三区电影在线观看| 亚洲第一电影网| 欧美日韩精品综合在线| 国内外成人在线视频| 精品免费日韩av| 在线影视一区二区三区| 粉嫩高潮美女一区二区三区| 亚洲一区二区三区四区在线免费观看| 免费日本视频一区| 国产人成亚洲第一网站在线播放| 国内精品嫩模私拍在线| 紧缚奴在线一区二区三区| 久久se这里有精品| 久久99久久久久久久久久久| 久久九九久精品国产免费直播| 久久伊人中文字幕| 亚洲一区二区美女| 91丨porny丨最新| 亚洲三级视频在线观看| 午夜精品一区二区三区电影天堂| 国产在线观看免费一区| 91精品国产黑色紧身裤美女| 中文乱码免费一区二区 | 免费在线看成人av| 欧美理论电影在线| 香蕉成人啪国产精品视频综合网| 91色乱码一区二区三区| 国产精品免费aⅴ片在线观看| 极品尤物av久久免费看| 日韩欧美国产小视频| 毛片一区二区三区| 久久久国产一区二区三区四区小说| 日韩av成人高清| 99精品视频免费在线观看| 中文字幕在线免费不卡| 99久久精品国产网站| 亚洲欧洲制服丝袜| 欧美一区二区三区在线观看 | 在线免费观看日本一区| 亚洲三级电影全部在线观看高清| 99精品黄色片免费大全| 亚洲欧美乱综合| 欧美精品一区二区三区四区 | 欧美一区二区三区在| 久久国产尿小便嘘嘘尿| 中文字幕免费不卡| 欧美日韩夫妻久久| 成人高清视频在线| 国产精品一卡二卡在线观看| 国产亲近乱来精品视频| 欧美猛男男办公室激情| 国产精品77777| 日韩中文字幕不卡| 视频一区二区不卡| 一区二区三区精品视频| 久久综合久久鬼色中文字| 欧美日韩色一区| 91在线观看一区二区| 国产资源精品在线观看| 日韩中文欧美在线| 美腿丝袜亚洲三区| 秋霞午夜av一区二区三区| 亚洲一区二区欧美激情| 一区二区三区电影在线播| 日本一区二区三级电影在线观看| 337p亚洲精品色噜噜噜| 91 com成人网| 精品国产百合女同互慰| 日韩欧美在线一区二区三区| 99精品视频中文字幕| 欧洲一区二区av| 欧美精品一区二区三区蜜臀| 91一区在线观看| 色婷婷亚洲婷婷| 26uuu成人网一区二区三区| 久久麻豆一区二区| 综合色天天鬼久久鬼色| 午夜欧美2019年伦理| 狠狠色狠狠色综合系列| 成人av电影在线观看| 91黄色在线观看| 日韩无一区二区| 中文字幕亚洲一区二区av在线| 亚洲与欧洲av电影| 丁香另类激情小说| 91精选在线观看| 日韩一区中文字幕| 国产自产视频一区二区三区| 国产精品一区二区在线观看不卡 | 亚洲综合一二区| 91福利在线播放| 91精品国产一区二区人妖| 中文天堂在线一区| 国产免费成人在线视频| 午夜不卡av免费| 色偷偷久久一区二区三区| 91精品久久久久久久久99蜜臂| 中文字幕久久午夜不卡| 免费在线看成人av| 日韩欧美黄色影院| 国产麻豆91精品| 国产精品天天看| 91尤物视频在线观看| 中文欧美字幕免费| 99久久久久久| 一区二区三区国产精品| 91尤物视频在线观看| 国产精品妹子av| 欧日韩精品视频| 国模套图日韩精品一区二区| 色视频欧美一区二区三区| 亚洲欧美日韩国产综合在线 | 免费观看成人鲁鲁鲁鲁鲁视频| 欧洲国产伦久久久久久久| 图片区小说区区亚洲影院| 欧美色综合久久| 日本va欧美va欧美va精品| 精品国产成人在线影院 | 日韩欧美国产午夜精品| 午夜精品免费在线观看| xfplay精品久久| 欧美性受极品xxxx喷水| 日本欧美韩国一区三区| 久久久天堂av| 91精品国产入口| 欧洲视频一区二区| 91麻豆精品一区二区三区| 久久国产福利国产秒拍| 国产精品网站在线| 国产婷婷精品av在线| 在线播放视频一区| 欧美私人免费视频| 99精品久久免费看蜜臀剧情介绍| 偷拍自拍另类欧美| 蜜桃视频在线一区| 免费高清成人在线| 日韩av中文字幕一区二区三区| 亚洲欧洲www| 亚洲蜜臀av乱码久久精品| 中文字幕av不卡| 亚洲女人小视频在线观看| 日韩一区中文字幕| 一区2区3区在线看| 天堂影院一区二区| 热久久国产精品| 成人精品gif动图一区| 国产福利一区在线| 91在线国内视频| 欧美四级电影网| 亚洲已满18点击进入久久| 中文字幕在线不卡| 午夜精品免费在线| 国产99久久久国产精品| 国产精品亚洲第一 | 色老汉一区二区三区| www.欧美亚洲| 欧美浪妇xxxx高跟鞋交| 欧美一区二区三区系列电影| 不卡视频免费播放| 成人黄色小视频在线观看| 色久优优欧美色久优优| 欧美图片一区二区三区| 欧美电影免费观看高清完整版在线| 日韩欧美久久久| 亚洲一二三区视频在线观看| 亚洲精品福利视频网站| 开心九九激情九九欧美日韩精美视频电影 | 在线精品亚洲一区二区不卡| 国产v日产∨综合v精品视频| 久久99精品国产麻豆不卡| 国产.欧美.日韩|