From 7f2aae69d4bcea30e0e275aed1e9df64f66a524b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Mar 2023 01:54:29 +0800 Subject: [PATCH] fix(tmq): fix error in taosx --- source/client/src/clientTmq.c | 47 +++++++++++++++++++----------- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 35 ++++++++++------------ source/dnode/vnode/src/tq/tqRead.c | 6 ++-- utils/test/c/tmqSim.c | 22 ++++++++------ 5 files changed, 63 insertions(+), 49 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e708b64a92..55d0453fbf 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -150,6 +150,7 @@ typedef struct { int32_t epoch; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; + uint64_t reqId; union { SMqDataRsp dataRsp; SMqMetaRsp metaRsp; @@ -458,8 +459,6 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { pVg->epSet = *pBuf->pEpSet; } - // update the offset value. - pVg->committedOffset = pVg->currentOffset; tscDebug("consumer:0x%" PRIx64 " subKey:%s vgId:%d, commit offset success. ordinal:%d/%d", pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, index + 1, numOfVgroups); } @@ -699,6 +698,9 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, j + 1, numOfVgroups); continue; } + + // update the offset value. + pVg->committedOffset = pVg->currentOffset; } else { tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups); @@ -1261,8 +1263,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pParam); if (code != 0) { - tscWarn("consumer:0x%"PRIx64" msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%"PRIx64, tmq->consumerId, vgId, - epoch, tstrerror(code), requestId); + tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, + vgId, epoch, tstrerror(code), requestId); if (pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet); @@ -1281,8 +1283,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; - /*pRspWrapper->vgHandle = pVg;*/ - /*pRspWrapper->topicHandle = pTopic;*/ taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); } @@ -1304,8 +1304,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } if (msgEpoch != tmqEpoch) { - tscWarn("consumer:0x%"PRIx64" mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%"PRIx64, tmq->consumerId, vgId, - msgEpoch, tmqEpoch, requestId); + tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64, + tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId); } // handle meta rsp @@ -1322,6 +1322,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->tmqRspType = rspType; pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; + pRspWrapper->reqId = requestId; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { SDecoder decoder; @@ -1350,13 +1351,11 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); + taosWriteQitem(tmq->mqueue, pRspWrapper); tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, tmq->mqueue->numOfItems, requestId); - - taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); - return 0; CREATE_MSG_FAIL: @@ -1763,6 +1762,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } + tscDebug("consumer:0x%"PRIx64" handle rsp, type:%d", tmq->consumerId, rspWrapper->tmqRspType); + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; @@ -1779,7 +1780,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->dataRsp.blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId); + tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, + pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); rspWrapper = NULL; continue; @@ -1789,8 +1791,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { char buf[80]; tFormatOffset(buf, 80, &pVg->currentOffset); SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); - tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d", tmq->consumerId, - pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum); + tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, + pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); return pRsp; @@ -1824,17 +1826,19 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; - /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, - * rspMsg->msg.rspOffset);*/ pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + if (pollRspWrapper->taosxRsp.blockNum == 0) { taosFreeQitem(pollRspWrapper); rspWrapper = NULL; + + tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, + pollRspWrapper->reqId); continue; } @@ -1845,8 +1849,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); } + taosFreeQitem(pollRspWrapper); + + char buf[80]; + tFormatOffset(buf, 80, &pVg->currentOffset); + tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"PRIx64, tmq->consumerId, pVg->vgId, + buf, pollRspWrapper->dataRsp.blockNum, pollRspWrapper->reqId); return pRsp; + } else { tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); @@ -1854,6 +1865,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); } } else { + tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); + bool reset = false; tmqHandleNoPollRsp(tmq, rspWrapper, &reset); taosFreeQitem(rspWrapper); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 6cb54c6c18..a45d6d1dec 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); -int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a6919d97a9..76a6a8a840 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -330,15 +330,16 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co .contLen = tlen, .code = 0, }; + tmsgSendRsp(&rsp); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); + tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", + TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; } @@ -600,7 +601,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (metaRsp.metaRspLen > 0) { code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64 + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",version:%" PRId64, consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); @@ -617,7 +618,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* offset = taosxRsp.rspOffset; } - tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",version:%" PRId64, consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version); @@ -637,7 +638,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* // todo refactor: this is not correct. int32_t savedEpoch = atomic_load_32(&pHandle->epoch); if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); break; @@ -645,9 +646,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) { - code = -1; - } + code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; @@ -669,11 +668,10 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest->subKey); return -1; } - if (taosxRsp.blockNum > 0 /* threshold */) { + + if (taosxRsp.blockNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) { - code = -1; - } + code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; @@ -741,21 +739,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosRUnLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock); - int32_t savedEpoch = pHandle->epoch; - // 3. update the epoch value -// while (savedEpoch < reqEpoch) { + int32_t savedEpoch = pHandle->epoch; + if (savedEpoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); pHandle->epoch = reqEpoch; -// savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); -// } - + } taosWUnLockLatch(&pTq->pushLock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId, - req.epoch, pHandle->subKey, vgId, buf); + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, + consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); return extractDataForMq(pTq, pHandle, &req, pMsg); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bf73cca925..5bb08df2a0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -183,14 +183,15 @@ end: return tbSuid == realTbSuid; } -int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t code = 0; + taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); *fetchOffset = offset - 1; code = -1; @@ -241,6 +242,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea offset++; } } + END: taosThreadMutexUnlock(&pHandle->pWalReader->mutex); return code; diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 1acf50a7d8..ce4bafb79b 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -386,7 +386,7 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows; pInfo->numOfVgroups++; - taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId); + taosFprintfFile(g_fp, "consume id %d, add new vgroupId:%d\n", pInfo->consumerId, vgroupId); if (pInfo->numOfVgroups > MAX_VGROUP_CNT) { taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId); @@ -578,18 +578,25 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn char buf[1024]; int32_t totalRows = 0; - // printf("topic: %s\n", tmq_get_topic_name(msg)); int32_t vgroupId = tmq_get_vgroup_id(msg); const char* dbName = tmq_get_db_name(msg); taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex); - taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", - tmq_get_topic_name(msg), vgroupId); + int32_t index = 0; + for (index = 0; index < pInfo->numOfVgroups; index++) { + if (vgroupId == pInfo->rowsOfPerVgroups[index][0]) { + break; + } + } + + taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId:%d, currentRows:%d\n", dbName != NULL ? dbName : "invalid table", + tmq_get_topic_name(msg), vgroupId, pInfo->rowsOfPerVgroups[index][1]); while (1) { TAOS_ROW row = taos_fetch_row(msg); - - if (row == NULL) break; + if (row == NULL) { + break; + } TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); @@ -607,7 +614,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn #endif dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision); - taos_print_row(buf, row, fields, numOfFields); if (0 != g_stConfInfo.showRowFlag) { @@ -621,7 +627,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn } addRowsToVgroupId(pInfo, vgroupId, totalRows); - return totalRows; } @@ -817,7 +822,6 @@ void loop_consume(SThreadInfo* pInfo) { } taos_free_result(tmqMsg); - totalMsgs++; int64_t currentPrintTime = taosGetTimestampMs();