ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Redis 원정대 - 4 : Redis Pub/Sub
    NoSQL 2023. 3. 3. 03:56

    개요

    Redis 는 Channel 을 이용해서 Pub/Sub 시스템 기능을 제공한다.
    간단히 설명하면, Pub Client 에서 Channel 에 데이터를 푸시하면
    Sub Client 에서는 해당 데이터를 수신하는 구조이다.


    내부 구조 Internals(데이터 구조 Data Structures)

    Redis Pub/Sub 시스템은 레디스 서버와 클라이언트에 채널과 패턴을 등록한다.
    채널은 dict(Hash table)에 저장되고 패턴은 Linked List에 저장된다.


    server.pubsub_channels

    http://redisgate.kr/redis/command/pubsub_intro.php


    dict 및 dictEntry 는 Redis 에서 사용하는 해시 테이블이다.

    • dictEntry의 key 필드 : channel 을 가리킨다.
    • dictEntry의 value 필드 : 리스트(linked list)를 가리킨다.
      • 한 channel 을 기준으로 여러 클라이언트가 Subscribe 할 수 있다.
      • 각 노드는 구독한 클라이언트를 가리킨다.(O(n) 방식으로 push)

     

    PUBLISH channel message 명령을 사용했다 가정하면

    1. Hash table에서 channel 을 찾는다.
    2. channel 의 Value 즉, 리스트에 저장되어 있는 클라이언트들에게 하나씩 메시지를 보낸다.
    3. 이후에는, 패턴을 등록한 클라이언트들에게 메시지를 보낸다.

    server.pubsub_patterns

    패턴을 저장하는 Linked List 구조는 다음과 같다.

    http://redisgate.kr/redis/command/pubsub_intro.php

    리스트의 각 노드는 pubsubPattern 구조체를 가리키고, 이 구조체는 클라이언트와 패턴을 가진다.
    PUBLISH channel message 명령을 사용했다 가정하면

    1. channel 명으로 클라이언트에 메시지를 보낸 다음,
    2. channel에 해당하는 패턴을 찾아 해당 클라이언트에 메시지를 보낸다.

    client.pubsub_channels

    http://redisgate.kr/redis/command/pubsub_intro.php

    클라이언트 구조체의 pubsub_channels 필드도 채널을 저장하는 dict 구조체를 가리킨다.

    • dictEntry의 key 필드 : channel을 가리킨다.
    • dictEntry의 Value : Null

     

    Subscribe 명령이 수행되면 클라이언트 구조체에 채널을 저장하고, 서버 구조체에도 채널을 저장한다.
    클라이언트 구조체는 클라이언트가 해당 채널을 등록하고, 클라이언트를 pubsub 모드로 전환하는 역할을 한다.

    • normal 모드면 client buffer가 무제한이다.
    • pubsub 모드이면 hard limit은 32mb, soft limit는 8mb로 제한한다.

     

    Unsubscribe 또는 punsubscribe 명령이 수행되어
    클라이언트 구조체에 채널이나 패턴이 하나도 없으면 클라이언트를 pubsub 모드에서 normal 모드로 전환한다.


    client.pubsub_patterns

    http://redisgate.kr/redis/command/pubsub_intro.php

    패턴을 저장하는 리스트를 가리킨다.  
    리스트의 각 노드는 서버와 달리 pattern 구조체를 바로 가리킨다.

    Psubscribe 명령이 수행되면 클라이언트 구조체인 리스트에 패턴을 저장하고,
    서버 구조체에도 패턴을 저장하며, 클라이언트를 pubsub 모드로 전환하는 역할도 한다.


    FUNCTIONS

    레디스 pubsub는 여섯 개의 명령으로 구성되어 있다.

    메시지를 받는 subscribe와 psubscribe가 있고,
    메시지 받기를 중지하는 unsubscribe와 punsubscribe가 있다.  
    메시지를 보내는 publish가 있고, 등록된 채널 리스트와 개수를 조회하고 패턴 개수를 조회하는 pubsub 명령이 있다.


    SUBSCRIBE

    등록한 채널을 삭제해서 더 이상 메시지를 받지 않도록 한다.

    http://redisgate.kr/redis/command/pubsub_intro.php


    SUBSCRIBE 명령이 수행되면 서버 구조체(redisServer struct)와 클라이언트 구조체(redisClient struct)에 채널을 등록한다.

    1. 클라이언트 구조체에 등록하는 과정,
      dictAdd(client->pubsub_channels,channel,NULL)를 실행하여
      redisClient.pubsub_channels의 dict 구조체에 채널명을 등록한다.
    2. 서버 구조체에 등록하는 과정,
      dictFind(server.pubsub_channels,channel)를 수행하여
      채널이 없으면 list = listCreate()를 수행하여 리스트를 생성한다.
      1. Channel이 이미 등록되어 있으면, list = dictGetVal(dictEntry)를 수행해서 리스트를 얻어오고, 4번으로 간다.
    3.  dictAdd(server.pubsub_channels,channel,list)을 수행해서 채널과 리스트를 dict에 등록한다
    4. listAddNodeTail(list,client)를 수행해서 리스트에 클라이언트를 등록한다.

    PSUBSCRIBE

    패턴을 등록해서 메시지를 받는 명령이다.

    http://redisgate.kr/redis/command/pubsub_intro.php

    PSUBSCRIBE 명령이 수행되면 서버와 클라이언트의 리스트(Linked List)에 패턴을 등록한다.

    1. listSearchKey(client->pubsub_patterns,pattern)로 패턴이 이미 있는지 확인한다.
      없으면 listAddNodeTail(client->pubsub_patterns,pattern)를 수행해서 클라이언트 리스트에 패턴을 등록한다.
    2. listAddNodeTail(server.pubsub_patterns,pat)로 서버에 패턴을 등록하는데,
      클라이언트에는 패턴만 등록하는 반면, 서버에는 pubsubPattern 구조체에 client와 pattern을 담아서 같이 등록한다.
      (패턴을 등록한 클라이언트에 메시지를 보내기 위해서이다.)

    UNSUBSCRIBE

    등록한 채널을 삭제해서 더 이상 메시지를 받지 않도록 한다.

    http://redisgate.kr/redis/command/pubsub_intro.php

    인수로 채널을 입력하지 않으면  pubsubUnsubscribeAllChannels()를 호출해서 클라이언트에 등록된 모든 채널을 삭제한다.

    pubsubUnsubscribeAllChannels()에서 while loop를 돌면서
    pubsubUnsubscribeChannel()를 호출해서 dictNect()를 이용해서 채널을 하나씩 삭제한다.

    채널을 입력하면 입력한 채널 개수만큼 while loop를 돌면서 pubsubUnsubscribeChannel()를 호출해서 채널을 하나씩 삭제한다.

    1. dictDelete(client->pubsub_channels,channel)를 수행해서 클라이언트에 채널을 삭제
    2. dictEntry = dictFind(server.pubsub_channels,channel)로 서버에서 dictEntry를 구한다.
    3. list = dictGetVal(dictEntry)로 value에 저장된 list를 가져온다.
    4. listNode = listSearchKey(list,client)를 수행해서 리스트 노드를 얻어온다.
    5. listDelNode(list,listNode)를 수행해서 리스트에서 노드를 삭제한다.
    6. 리스트에 노드가 하나도 없으면  dictDelete(server.pubsub_channels,channel)를 수행해서 리스트를 삭제한다.

    PUNSUBSCRIBE

    등록한 패턴을 삭제해서 더 이상 메시지를 받지 않도록 한다.

    http://redisgate.kr/redis/command/pubsub_intro.php

    인수로 패턴을 입력하지 않으면  pubsubUnsubscribeAllPatterns()를 호출해서 클라이언트에 등록된 모든 패턴을 삭제한다.

    pubsubUnsubscribeAllPatterns()에서 while loop를 돌면서 
    pubsubUnsubscribePattern()를 호출해서 listDelNode()를 이용해서 채널을 하나씩 삭제한다.

    채널을 입력하면 입력한 채널 개수만큼 while loop를 돌면서 pubsubUnsubscribePattern()를 호출해서 채널을 하나씩 삭제한다.

    1. 먼저 listSearchKey(client->pubsub_patterns,pattern)을 실행해서 클라이언트에서 패턴을 찾는다.
    2. listDelNode(client->pubsub_patterns,listNode)를 수행해서 패턴을 삭제한다.
    3. 다음 listSearchKey(server->pubsub_patterns,pubsubPattern)을 실행해서 서버에서 패턴을 찾는다.
    4. listDelNode(server->pubsub_patterns,listNode)를 수행해서 패턴을 삭제한다.

    PUBLISH

    PUBLISH는 메시지를 보내는 명령이다.

    http://redisgate.kr/redis/command/pubsub_intro.php

    PUBLISH는 크게 두 부분으로 구성되어 있다.

    • 서버 내에 클라이언트에게 메시지를 보내는 것
    • 서버가 클러스터 모드이면, 다른 서버들에게 전달해서 메시지를 각 서버에 접속해 있는 클라이언트들에게 메시지를 보내는 것

     

    서버 내에서 동작 방식

    1. 먼저 서버에서 해당 채널을 찾는다. 
      dictFind(server.pubsub_channels,channel).
    2. listNext(listNode)로 노드가 없을 때까지 While Loop를 돌면서
      등록된 클라이언트에 addReplyBulk(client,message)로 메시지를 보낸다.
    3. listLength(server.pubsub_patterns)으로 서버에 등록된 패턴이 있는지 확인한다.
    4. listNext(listNode)로 노드가 없을 때까지 While Loop를 돌면서 stringmatchlen() 으로 비교해서 맞으면
      클라이언트에 addReplyBulk(pubsubPattern->client,message)로 메시지를 보낸다.

     

    클러스터에서의 동작 방식

    • clusterPropagatePublish()부터는 cluster.c 에 있다.
    • 클러스터의 서버들에 대한 정보가 dict에 저장되어 있고,
      dictNext()로 클러스터 서버 정보를 얻어서 채널과 메시지를 보낸다.
    • 해당 서버는 채널과 메시지를 받아서
      서버 내에서 동작 방식과정을 거처 클라이언트들에게 메시지를 보낸다.

    PUBSUB

    http://redisgate.kr/redis/command/pubsub_intro.php

    • Subcommand가 channels 이면 server.pubsub_channels에서 dictNext()로 이동하면서 채널을 보여준다.
      패턴이 입력되었으면 stringmatchlen()으로 패턴과 비교해서 맞는 채널들만 보여준다.
    • Subcommand가 numsub 면 해당 채널에 등록된 클라이언트 개수를 listLength(list)로 구해서 보여준다.
      즉, 특정 채널을 구독하고 있는 subscriber의 갯수를 확인할 수 있다.
      (pubsub numsub ch1)
    • Subcommand가 numpat 면 리스트에 등록된 패턴 개수를 listLength(list)로 구해서 보여준다.
      즉, pattern subscription의 subscriber 갯수를 확인할 수 있다.
      (pubsub numpat)

    Scaling Pub/Sub

    Redis Cluster 와 pub-sub 을 함께 사용시 고려할 점

    https://www.youtube.com/watch?v=6G22a5Iooqk&t=1s&ab_channel=Redis

    • Redis Cluster 와 pub-sub 을 함께 사용한다면, Pub구조 특성상, 다른 클러스터 노드에 채널과 메시지를 정보를 보낸다.
    • 즉, 쓰기 부하가 여럿 발생하게 되는 구조이며 이는 곧, 클러스터의 노드의 개수가 많으면 성능이 감퇴되는 역효과가 발생한다.

     

    https://www.youtube.com/watch?v=6G22a5Iooqk&t=1s&ab_channel=Redis

    • 계속 증가하는 메시지 수 및 점점 늘어나는 클라이언트에 대해서 지원 가능해야한다.
    • 자동으로 레디스 클러스터 서버 수를 늘리거나 줄일 수 있어야 한다.
    • 모든 redis pubsub 기능(다중 구독, 패턴 등) 지원

    Sharded Pub/Sub

    Sharded PubSub 은 기존 Redis Cluster 에서의 PubSub 의 단점을 해결하기 위해서 도입된 기술이다.

    https://www.youtube.com/watch?v=M3U4587ZAS8&ab_channel=Redis

    Slot 에 Key를 할당하는데 사용되는 것 과 동일한 알고리즘으로
    Slot 에 Channel 이 할당되는 Sharded Pub/Sub 이 도입되었다.

    Shard Channel 이 해시되는 Slot 을 소유한 노드에 Shard 메시지를 보낼 수 있다.
    클라이언트 또한, 각 메시지를 수신받기 위해 Sharding 된 여러 노드들을 구독할 수 있다.
    SSUBSCRIBE, SUNSUBSCRIBE 및 SPUBLISH는 샤드된 Pub/Sub를 구현하는 데 사용된다.


    Sharded PubSub 이전 Cluster PubSub

    Redis Cluster 에서의 Pub/Sub 에서 한대의 서버에 publish 하면,
    해당 노드는 모든 노드에 publish 를 brodcast 한다.
    Primary에 subscribe 를 하든, Replica 에 subscribe 를 하든 상관없다.(물론 publish 도)

    이로 인해, 어떤 노드에 subscribe를 하든 해당 broadcast 되는 메시지를 받을 수 있지만
    모든 채널을 확인해야 하는 이슈가 있기 때문에, 보통 메시지 전달은 채널수 + 채널에 붙은 클라이언트 수 만큼 루프를 돌아야 한다.


    Sharded PubSub

    key를 crc16으로 Hash 해서 해당 키가 속한 slot 을 처리하는 서버로만 메시지를 전달하게 되는데(-MOVED 를 이용),
    이 특성을 이용해서 Pub/Sub도 하나의 Shard 군에서만 처리하자이다.
    Shard 군이기에 해당 slot 을 가진 primary 와 replica 는 모두 전달받게 된다.(레플리카 개수 따라)

    참고로 pattern 형식은 전체 노드를 subscribe 하지 않고서는 Sharded PuSub 을 지원할 수 없다. 
    패턴의 hash 값이 어떤 slot에 들어갈지를 알 수 없기 때문이다.
    

     

    pusbsubtype

    typedef struct pubsubtype {
        int shard;
        dict *(*clientPubSubChannels)(client*);
        int (*subscriptionCount)(client*);
        dict **serverPubSubChannels;
        robj **subscribeMsg;
        robj **unsubscribeMsg;
    }pubsubtype;
    
    pubsubtype pubSubType = {
        .shard = 0,
        .clientPubSubChannels = getClientPubSubChannels,
        .subscriptionCount = clientSubscriptionsCount,
        .serverPubSubChannels = &server.pubsub_channels,
        .subscribeMsg = &shared.subscribebulk,
        .unsubscribeMsg = &shared.unsubscribebulk,
    };
     
    /*
     * Pub/Sub type for shard level channels bounded to a slot.
     */
    pubsubtype pubSubShardType = {
        .shard = 1,
        .clientPubSubChannels = getClientPubSubShardChannels,
        .subscriptionCount = clientShardSubscriptionsCount,
        .serverPubSubChannels = &server.pubsubshard_channels,
        .subscribeMsg = &shared.ssubscribebulk,
        .unsubscribeMsg = &shared.sunsubscribebulk
    };
    
    • 이전 버전의 pubsub 과 v7 에서의 pubsub 을 지원하기 위해 추상화된 pubsubtype 구조체가 생겼다.
    • 추가로 구조체 pubsubtype 을 구현한 2개의 타입이 생겼다.
      • pubSubType
      • pubSubShardType

     

    그리고 이를 지원하기 위해 서버와 클라이언트에 변수가 추가되었다.

     

    server.pubsubshard_channels

    dict *pubsubshard_channels;  /* Map channels to list of subscribed clients */
    int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster is down, doesn't affect pubsub global. */
    
    server.pubsub_channels = dictCreate(&keylistDictType);
    server.pubsub_patterns = dictCreate(&keylistDictType);
    server.pubsubshard_channels = dictCreate(&keylistDictType);
    
    • pubsubshard_channels 채널을 저장하는 dict 구조체를 가리키는 변수가 추가되었다.

     

    dict* getClientPubSubChannels(client *c) {
        return c->pubsub_channels;
    }
     
    dict* getClientPubSubShardChannels(client *c) {
        return c->pubsubshard_channels;
    }
    
    • getClientPubSubChannels 와 getClientPubSubShardChannels 는 아래와 같이 서로 다른 변수를 전달한다.

     

    /* Return the number of channels + patterns a client is subscribed to. */
    int clientSubscriptionsCount(client *c) {
        return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
    }
     
    /* Return the number of shard level channels a client is subscribed to. */
    int clientShardSubscriptionsCount(client *c) {
        return dictSize(c->pubsubshard_channels);
    }
    
    • clientSubscriptionsCount 와 clientShardSubscriptionsCount 는 위와 같다.

     

    /* SSUBSCRIBE channel [channel ...] */
    void ssubscribeCommand(client *c) {
        if (c->flags & CLIENT_DENY_BLOCKING) {
            addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
            return;
        }
     
        for (int j = 1; j < c->argc; j++) {
            if (server.cluster_enabled &
                (dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
                slotToChannelAdd(c->argv[j]->ptr);
            }
            pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
        }
        c->flags |= CLIENT_PUBSUB;
    }
    
    • SSUBSCRIBE 명령어가 추가로 생겼으며 동작은 아래와 같다.
      • cluster_enabled 상태(클러스터 모드)에서 구독하려는 채널이 등록되어있지 않다면
        slotToChannelAdd 명령을 통해서 cluster 의 slot_to_channels 에 추가하게 된다.

     

    int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
        dictEntry *de;
        list *clients = NULL;
        int retval = 0;
     
        if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
            retval = 1;
            incrRefCount(channel);
    			
    				// 채널 조회 
            de = dictFind(*type.serverPubSubChannels, channel);
    				// 없으면, dict 에 추가 
            if (de == NULL) {
                clients = listCreate();
                dictAdd(*type.serverPubSubChannels, channel, clients);
                incrRefCount(channel);
            } else {
                clients = dictGetVal(de);
            }
            listAddNodeTail(clients,c);
        }
        addReplyPubsubSubscribed(c,channel,type);
        return retval;
    }
    
    1. 클라이언트 구조체에 등록하는 과정,
      dictAdd(type.clientPubSubChannels(c),channel,NULL)를 실행하여
      redisClient.pubsub_channels의 dict 구조체에 채널명을 등록한다.
    2. dictFind(*type.serverPubSubChannels, channel)를 수행한다.
      1. 채널이 등록되어 있지 않다면 list = listCreate()를 수행하여 리스트를 생성하고 dictAdd(server.pubsub_channels,channel,list)을 수행해서
        redisClient.pubsub_channels의 dict 구조체에 채널과 리스트를 등록한다.
      2. 채널이 이미 등록되어 있으면, clients = dictGetVal(de)를 수행해서 리스트를 얻어오고, 4번으로 간다.
    3. listAddNodeTail(client, c)를 수행해서 리스트에 클라이언트를 등록한다.
    4. addReplyPubsubSubscribed(c,channel,type);

     

    void spublishCommand(client *c) {
        int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
        if (server.cluster_enabled) {
            clusterPropagatePublishShard(c->argv[1], c->argv[2]);
        } else {
            forceCommandPropagation(c,PROPAGATE_REPL);
        }
        addReplyLongLong(c,receivers);
    }
    
    /* PUBLISH <channel> <message> */
    void publishCommand(client *c) {
        if (server.sentinel_mode) {
            sentinelPublishCommand(c);
            return;
        }
     
        int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
        if (server.cluster_enabled)
            clusterPropagatePublish(c->argv[1],c->argv[2]);
        else
            forceCommandPropagation(c,PROPAGATE_REPL);
        addReplyLongLong(c,receivers);
    }
    
    • SPUBLISH 라는 명령어도 추가로 생겼으며 동작은 아래와 같다.
    • spublishCommand 의 clusterPropagatePublishShard 는 해당 shard 에만 메세지를 보내는 메서드다.
      (이 부분이 가장 큰 차이입니다.)
    • 형태는 거의 비슷하지만 센티널 모드를 고려하지 않는 점 + 샤드 펍섭 전용 메서드

     

    int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
        int receivers = 0;
        dictEntry *de;
        dictIterator *di;
        listNode *ln;
        listIter li;
     
        /* Send to clients listening for that channel */
        de = dictFind(*type.serverPubSubChannels, channel);
        if (de) {
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
     
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = ln->value;
                addReplyPubsubMessage(c,channel,message);
                updateClientMemUsage(c);
                receivers++;
            }
        }
     
        if (type.shard) {
            return receivers;
        }
     
        di = dictGetIterator(server.pubsub_patterns);
        if (di) {
            channel = getDecodedObject(channel);
            while((de = dictNext(di)) != NULL) {
                robj *pattern = dictGetKey(de);
                list *clients = dictGetVal(de);
                if (!stringmatchlen((char*)pattern->ptr,
                                    sdslen(pattern->ptr),
                                    (char*)channel->ptr,
                                    sdslen(channel->ptr),0)) continue;
     
                listRewind(clients,&li);
                while ((ln = listNext(&li)) != NULL) {
                    client *c = listNodeValue(ln);
                    addReplyPubsubPatMessage(c,pattern,channel,message);
                    updateClientMemUsage(c);
                    receivers++;
                }
            }
            decrRefCount(channel);
            dictReleaseIterator(di);
        }
    

    pubsubPublishMessageInternal 는 세 번째 파라매터로 pubsubtype 을 받는다.
    단, pattern 을 사용하는 방식은 여기 코드에서도 나오지만 shard type을 지원하지 않는다.
    이는 pattern형식은 전체 노드를 subscribe 하지 않고서는 지원할 수 없기 때문이다.
    (패턴의 hash 값이 어떤 slot에 들어갈지를 알 수 없음.)

     

    void clusterPropagatePublishShard(robj *channel, robj *message) {
        list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
        if (listLength(nodes_for_slot) != 0) {
            listIter li;
            listNode *ln;
            listRewind(nodes_for_slot, &li);
            while((ln = listNext(&li))) {
                clusterNode *node = listNodeValue(ln);
                if (node != myself) {
                    clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
                }
            }
        }
        listRelease(nodes_for_slot);
    }
    
    • channel 이 속한 cluster 로 clusterSendPublish를 통해서 전달되게 된다.
    • 여기서 clusterGetNodesServingMySlots 에서는 해당 slot 을 가진 primary 와 replica 에게 모두 전달한다.

     

    REDIS_NO_SANITIZE("bounds")
    void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
        unsigned char *payload;
        clusterMsg buf[1];
        clusterMsg *hdr = (clusterMsg*) buf;
        uint32_t totlen;
        uint32_t channel_len, message_len;
     
        channel = getDecodedObject(channel);
        message = getDecodedObject(message);
        channel_len = sdslen(channel->ptr);
        message_len = sdslen(message->ptr);
     
        clusterBuildMessageHdr(hdr,type);
        totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
        totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
     
        hdr->data.publish.msg.channel_len = htonl(channel_len);
        hdr->data.publish.msg.message_len = htonl(message_len);
        hdr->totlen = htonl(totlen);
     
        /* Try to use the local buffer if possible */
        if (totlen < sizeof(buf)) {
            payload = (unsigned char*)buf;
        } else {
            payload = zmalloc(totlen);
            memcpy(payload,hdr,sizeof(*hdr));
            hdr = (clusterMsg*) payload;
        }
        memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
        memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
            message->ptr,sdslen(message->ptr));
     
        if (link)
            clusterSendMessage(link,payload,totlen);
        else
            clusterBroadcastMessage(payload,totlen);
     
        decrRefCount(channel);
        decrRefCount(message);
        if (payload != (unsigned char*)buf) zfree(payload);
    }
    
    • clusterSendPublish 함수는 publish 메세지를 해당 slot 을 가진 노드에 전달한다.

    client.pubsubshard_channels

    dict *pubsubshard_channels;  /* shard level channels a client is interested in (SSUBSCRIBE) */
    

    'NoSQL' 카테고리의 다른 글

    MongoDB Persistence  (0) 2023.04.06
    Redis 원정대 - 3 : Redis Pipelining  (0) 2023.03.02
    Redis 원정대 - 2 : Redis Client-Cache  (0) 2023.03.02
    Redis 원정대 - 1 : NoSQL 과 Redis  (2) 2023.02.28
Designed by Tistory.