From 6836dc1c8071cdbc1fb69bd546b936a936110349 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 Feb 2023 12:03:11 +0800 Subject: [PATCH] refactor: do some internal refactor and add some logs. --- source/client/src/clientTmq.c | 127 +++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 55 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d33f78d29d..2646fe30b3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -153,11 +153,9 @@ typedef struct { typedef struct { // subscribe info - char topicName[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - - SArray* vgs; // SArray - + char topicName[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + SArray* vgs; // SArray SSchemaWrapper schema; } SMqClientTopic; @@ -511,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT .handle = NULL, }; - tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey, + tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version); // TODO: put into cb @@ -642,13 +640,14 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName, - (int32_t)taosArrayGetSize(pTopic->vgs)); + int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); + tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName, + numOfVgroups); - for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName, + tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName, pVg->vgId); if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { @@ -792,34 +791,38 @@ OVER: taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer); } -int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { +int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = taosAllocateQall(); - taosReadAllQitems(tmq->delayedTask, qall); + taosReadAllQitems(pTmq->delayedTask, qall); + + tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); + while (1) { int8_t* pTaskType = NULL; taosGetQitem(qall, (void**)&pTaskType); if (pTaskType == NULL) break; if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { - tmqAskEp(tmq, true); + tmqAskEp(pTmq, true); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = tmq->refId; + *pRefId = pTmq->refId; - taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer); + taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); + tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = tmq->refId; + *pRefId = pTmq->refId; - taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer); + taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else { ASSERT(0); } taosFreeQitem(pTaskType); } + taosFreeQall(qall); return 0; } @@ -947,7 +950,7 @@ static void tmqMgmtInit(void) { } tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); - if (tmqMgmt.rsetId != 0) { + if (tmqMgmt.rsetId < 0) { tmqInitRes = terrno; } } @@ -1257,7 +1260,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d", + tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d", tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType); @@ -1280,7 +1283,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); + tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1297,10 +1300,12 @@ CREATE_MSG_FAIL: bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics); + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch, - topicNumGet); + tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); if (newTopics == NULL) { @@ -1312,19 +1317,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { taosArrayDestroy(newTopics); return false; } - int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t i = 0; i < topicNumCur; i++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur); + tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); char buf[80]; tFormatOffset(buf, 80, &pVgCur->currentOffset); - tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, + tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); } @@ -1340,7 +1345,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName); + tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName); int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); @@ -1366,6 +1371,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { } taosArrayPush(newTopics, &topic); } + + // destroy current buffered existed topics info if (tmq->clientTopics) { int32_t sz = taosArrayGetSize(tmq->clientTopics); for (int32_t i = 0; i < sz; i++) { @@ -1373,17 +1380,21 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema); taosArrayDestroy(pTopic->vgs); } + taosArrayDestroy(tmq->clientTopics); } + taosHashCleanup(pHash); tmq->clientTopics = newTopics; - if (taosArrayGetSize(tmq->clientTopics) == 0) + if (taosArrayGetSize(tmq->clientTopics) == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); - else + } else { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + } atomic_store_32(&tmq->epoch, epoch); + tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId); return set; } @@ -1406,8 +1417,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { pParam->code = code; if (code != 0) { - tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId, - pParam->async, code); + tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, + pParam->async, tstrerror(code)); goto END; } @@ -1416,7 +1427,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { // Epoch will only increase when received newer epoch ep msg SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); - tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); + tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if (head->epoch <= epoch) { goto END; } @@ -1435,6 +1446,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { code = -1; goto END; } + pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; pWrapper->epoch = head->epoch; memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); @@ -1463,7 +1475,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); - tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt); + tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt); if (epSkipCnt < 5000) return 0; } atomic_store_32(&tmq->epSkipCnt, 0); @@ -1521,7 +1533,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { .handle = NULL, }; - sendInfo->requestId = generateRequestId(); + sendInfo->requestId = tmq->consumerId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqAskEpCb; @@ -1611,6 +1623,7 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } +// broadcast the poll request to all related vnodes int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -1619,7 +1632,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, + tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; /*if (vgSkipCnt < 10000) continue;*/ @@ -1627,7 +1640,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { if (skipCnt < 30000) { continue; } else { - tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId); + tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId); } #endif } @@ -1683,6 +1696,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { .len = msgSize, .handle = NULL, }; + sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; @@ -1690,18 +1704,19 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { sendInfo->msgType = TDMT_VND_TMQ_CONSUME; int64_t transporterId = 0; - /*printf("send poll\n");*/ char offsetFormatBuf[80]; tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset); - tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, + + tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); - /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; tmq->pollCnt++; } } + return 0; } @@ -1739,7 +1754,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } - tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper); + tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper); if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); @@ -1747,7 +1762,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; - tscDebug("consumer %" PRId64 " actual process poll rsp", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId); /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { @@ -1766,8 +1781,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - pollRspWrapper->dataRsp.head.epoch, consumerEpoch); + tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1785,8 +1800,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1816,8 +1831,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1827,7 +1842,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmqHandleNoPollRsp(tmq, rspWrapper, &reset); taosFreeQitem(rspWrapper); if (pollIfReset && reset) { - tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId); tmqPollImpl(tmq, timeout); } } @@ -1838,7 +1853,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime); #if 0 tmqHandleAllDelayedTask(tmq); @@ -1851,7 +1866,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId); return NULL; } @@ -1868,28 +1883,30 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (1) { tmqHandleAllDelayedTask(tmq); + if (tmqPollImpl(tmq, timeout) < 0) { - tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); /*return NULL;*/ } rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { - tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj); + tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId); return NULL; } + if (timeout != -1) { int64_t currentTime = taosGetTimestampMs(); int64_t passedTime = currentTime - startTime; if (passedTime > timeout) { - tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } - /*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ + /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ /*", left time %" PRId64,*/ /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/ tsem_timewait(&tmq->rspSem, (timeout - passedTime));