diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 76384fbe6a..33324552dc 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1377,7 +1377,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgNumGet; j++) { @@ -1447,14 +1447,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur); + tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[80]; tFormatOffset(buf, 80, &pVgCur->currentOffset); - tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, + tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 00684652f0..7f082c748b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -553,8 +553,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // remove if it has been register in the push manager, and return one empty block to consumer // tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); - taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); - + int32_t ret = taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t)); + tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); @@ -1069,22 +1069,24 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); taosWLockLatch(&pTq->lock); - void *pIter = taosHashIterate(pTq->pPushMgr, NULL); - while(pIter){ - STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle); - if(ASSERT(pHandle->msg != NULL)){ - tqError("pHandle->msg should not be null"); - break; - }else{ - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); - taosMemoryFree(pHandle->msg); - pHandle->msg = NULL; + if(taosHashGetSize(pTq->pPushMgr) > 0){ + void *pIter = taosHashIterate(pTq->pPushMgr, NULL); + while(pIter){ + STqHandle* pHandle = *(STqHandle**)pIter; + tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); + if(ASSERT(pHandle->msg != NULL)){ + tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + pIter = taosHashIterate(pTq->pPushMgr, pIter); } - pIter = taosHashIterate(pTq->pPushMgr, pIter); + taosHashClear(pTq->pPushMgr); } - taosHashClear(pTq->pPushMgr); // unlock taosWUnLockLatch(&pTq->lock); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 2398ef41f4..da8dc1d379 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -190,8 +190,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; - tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); - taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); + int32_t ret = taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); return code; diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index f2de219f4e..d98a45f0d3 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -232,7 +232,7 @@ void saveConfigToLogFile() { taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); } taosFprintfFile(g_fp, "\n"); - taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); + taosFprintfFile(g_fp, " expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); } char tmpString[128];