From 0cbfc79b90beadbd306139f413a979b5b748a315 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 7 Apr 2022 16:27:35 +0800 Subject: [PATCH] add multi topic test --- source/dnode/vnode/src/inc/tqInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 59 ++++++++++++++++++++++++++---- tests/script/jenkins/basic.txt | 2 +- tests/test/c/tmqSim.c | 2 +- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index bb42151cf3..6c4825e0f2 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -67,7 +67,7 @@ extern "C" { } \ } -#define TQ_BUFFER_SIZE 8 +#define TQ_BUFFER_SIZE 4 #define TQ_BUCKET_MASK 0xFF #define TQ_BUCKET_SIZE 256 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 602e9047b3..fcea9b93c1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -421,22 +421,62 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; + terrno = TSDB_CODE_SUCCESS; tDecodeSMqMVRebReq(msg, &req); vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); - pConsumer->consumerId = req.newConsumerId; - tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.newConsumerId); - tqHandlePurge(pTq->tqMeta, req.oldConsumerId); - terrno = TSDB_CODE_SUCCESS; + ASSERT(pConsumer->consumerId == req.oldConsumerId); + int32_t numOfTopics = taosArrayGetSize(pConsumer->topics); + if (numOfTopics == 1) { + STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); + ASSERT(strcmp(pTopic->topicName, req.topic) == 0); + STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId); + if (pNewConsumer == NULL) { + pConsumer->consumerId = req.newConsumerId; + tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.newConsumerId); + tqHandlePurge(pTq->tqMeta, req.oldConsumerId); + return 0; + } else { + taosArrayPush(pNewConsumer->topics, pTopic); + } + } else { + for (int32_t i = 0; i < numOfTopics; i++) { + STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); + if (strcmp(pTopic->topicName, req.topic) == 0) { + STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId); + if (pNewConsumer == NULL) { + pNewConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); + if (pNewConsumer == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return -1; + } + strcpy(pNewConsumer->cgroup, pConsumer->cgroup); + pNewConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); + pNewConsumer->consumerId = req.newConsumerId; + pNewConsumer->epoch = 0; + + taosArrayPush(pNewConsumer->topics, pTopic); + tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.newConsumerId); + return 0; + } + ASSERT(pNewConsumer->consumerId == req.newConsumerId); + taosArrayPush(pNewConsumer->topics, pTopic); + break; + } + } + // + } return 0; } int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); + bool create = false; vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId); @@ -450,6 +490,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); pConsumer->consumerId = req.consumerId; pConsumer->epoch = 0; + create = true; } STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic)); @@ -483,10 +524,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); ASSERT(pTopic->buffer.output[i].task); } - /*printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);*/ + vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); taosArrayPush(pConsumer->topics, pTopic); - tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.consumerId); + if (create) { + tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.consumerId); + } terrno = TSDB_CODE_SUCCESS; return 0; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 5a3ee003f0..576bf7de12 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -42,7 +42,7 @@ ./test.sh -f tsim/tmq/basic.sim ./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/oneTopic.sim -#./test.sh -f tsim/tmq/multiTopic.sim +./test.sh -f tsim/tmq/multiTopic.sim # --- stable ./test.sh -f tsim/stable/disk.sim diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 38264331c1..d7ae7668c8 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) { int32_t totalRows = 0; int32_t skipLogNum = 0; while (running) { - tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 3000); + tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 4000); if (tmqMsg) { totalMsgs++;