diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 5d1e95be7c..3e4b25c9e7 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -882,11 +882,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { // TODO: alloc mem /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ +#if 0 if (pRsp->msg.numOfTopics == 0) { /*printf("no data\n");*/ taosFreeQitem(pRsp); goto WRITE_QUEUE_FAIL; } +#endif tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset, pRsp->msg.rspOffset); @@ -938,7 +940,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { for (int32_t k = 0; k < vgNumCur; k++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); - /*printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);*/ + printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); } break; @@ -952,12 +954,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t offset = pVgEp->offset; - /*printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);*/ + printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset); if (pOffset != NULL) { offset = *pOffset; - /*printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);*/ + printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey); } - /*printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);*/ + printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset); SMqClientVg clientVg = { .pollCnt = 0, .currentOffset = offset, @@ -1260,13 +1262,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } // return -int32_t tmqHandleRes(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { +int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) { /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspHead->epoch > atomic_load_32(&tmq->epoch)) { SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead; tmqUpdateEp(tmq, rspHead->epoch, rspMsg); - tmqClearUnhandleMsg(tmq); + /*tmqClearUnhandleMsg(tmq);*/ *pReset = true; } else { *pReset = false; @@ -1297,6 +1299,11 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ pVg->currentOffset = rspMsg->msg.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + if (rspMsg->msg.numOfTopics == 0) { + taosFreeQitem(rspMsg); + rspHead = NULL; + continue; + } return rspMsg; } else { /*printf("epoch mismatch\n");*/ @@ -1305,7 +1312,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } else { /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; - tmqHandleRes(tmq, rspHead, &reset); + tmqHandleNoPollRsp(tmq, rspHead, &reset); taosFreeQitem(rspHead); if (pollIfReset && reset) { printf("reset and repoll\n"); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 7891c690e3..f418ec2290 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -620,13 +620,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { if (pSub->consumers) { - taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer); + //taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer); // taosArrayDestroy(pSub->consumers); pSub->consumers = NULL; } if (pSub->unassignedVg) { - taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + //taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); // taosArrayDestroy(pSub->unassignedVg); pSub->unassignedVg = NULL; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 806415ccd9..195afa3848 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -237,7 +237,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { for (int32_t j = 0; j < csz; j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (consumerId == pSubConsumer->consumerId) { - int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + mInfo("topic %s has %d vg", topicName, pConsumer->epoch); SMqSubTopicEp topicEp; strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); @@ -419,7 +420,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { int32_t vgNum = pSub->vgNum; int32_t vgEachConsumer = vgNum / consumerNum; int32_t imbalanceVg = vgNum % consumerNum; - int32_t imbalanceSolved = 0; // iterate all consumers, set unassignedVgStash for (int32_t i = 0; i < consumerNum; i++) { @@ -446,19 +446,24 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { + SMqConsumerObj* pNewRebConsumer = taosMemoryMalloc(sizeof(SMqConsumerObj)); + ASSERT(pNewRebConsumer); + memcpy(pNewRebConsumer, pRebConsumer, sizeof(SMqConsumerObj)); + pNewRebConsumer->currentTopics = taosArrayDup(pRebConsumer->currentTopics); + pNewRebConsumer->recentRemovedTopics = taosArrayDup(pRebConsumer->recentRemovedTopics); if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) { - pRebConsumer->epoch++; + pNewRebConsumer->epoch++; } if (vgThisConsumerAfterRb != 0) { - atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); + atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); } else { - atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); + atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); } - mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, - pRebConsumer->status); + mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pNewRebConsumer->consumerId, status, + pNewRebConsumer->status); - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pNewRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pConsumerRaw); } @@ -469,7 +474,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { if (taosArrayGetSize(pSub->unassignedVg) != 0) { for (int32_t i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgThisConsumerAfterRb; if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 82300f82cb..438a584842 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -264,7 +264,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - /*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/ + printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -299,8 +299,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // response to user break; } - /*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, - * pReq->epoch);*/ + printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, + pReq->epoch); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -353,9 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - /*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, - * pHead->msgType,*/ - /*pReq->epoch);*/ + printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -377,6 +375,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->epoch = pReq->epoch; + rsp.rspOffset = fetchOffset - 1; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqPollRsp(&abuf, &rsp); @@ -385,7 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - /*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/ + printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch); /*}*/ return 0; @@ -452,7 +451,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); ASSERT(pTopic->buffer.output[i].task); } - printf("set topic %s to consumer %ld\n", pTopic->topicName, req.consumerId); + printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); taosArrayPush(pConsumer->topics, pTopic); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); tqHandleCommit(pTq->tqMeta, req.consumerId); diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index cfcb1ac992..afd3f0f9b0 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -35,7 +35,7 @@ sql connect $dbNamme = d0 print =============== create database , vgroup 1 -sql create database $dbNamme vgroups 1 +sql create database $dbNamme vgroups 10 sql show databases print $data00 $data01 $data02 if $rows != 2 then