From 7b5edade957dd3c2d5348637ecf7d164a459eb8d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 20 Feb 2025 19:50:37 +0800 Subject: [PATCH 1/4] fix:[TD-337958]send batch metadata & remove data if subscribe only meta --- source/dnode/vnode/src/tq/tqRead.c | 16 +++ source/dnode/vnode/src/tq/tqScan.c | 4 +- source/dnode/vnode/src/tq/tqUtil.c | 171 +++++++++++++++++++---------- 3 files changed, 134 insertions(+), 57 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d650ae9751..ee705ed374 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1098,6 +1098,22 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block *pSubmitTbDataRet = pSubmitTbData; } + if (fetchMeta == ONLY_META) { + if (pSubmitTbData->pCreateTbReq != NULL) { + if (pRsp->createTableReq == NULL){ + pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES); + if (pRsp->createTableReq == NULL){ + return terrno; + } + } + if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL){ + return terrno; + } + pSubmitTbData->pCreateTbReq = NULL; + } + return 0; + } + int32_t sversion = pSubmitTbData->sver; int64_t uid = pSubmitTbData->uid; pReader->lastBlkUid = uid; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 70a165906e..9d1b99f483 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -339,6 +339,9 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0; TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); + if (pHandle->fetchMeta == ONLY_META){ + goto END; + } int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks); if (pRsp->withTbName) { @@ -347,7 +350,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int TSDB_CHECK_CODE(code, lino, END); } - tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbData->pCreateTbReq == NULL); TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); for (int32_t i = 0; i < blockNum; i++) { if (taosArrayGetSize(pBlocks) == 0){ diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 9866528446..4d908b5d57 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -226,6 +226,82 @@ static void tDeleteCommon(void* parm) {} taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \ (pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP) +static int32_t buildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){ + int32_t code = 0; + + if (!btMetaRsp->batchMetaReq) { + btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES); + TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq); + btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t)); + TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen); + } + + SMqMetaRsp tmpMetaRsp = {0}; + tmpMetaRsp.resMsgType = type; + tmpMetaRsp.metaRspLen = bodyLen; + tmpMetaRsp.metaRsp = body; + uint32_t len = 0; + tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code); + if (TSDB_CODE_SUCCESS != code) { + tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + goto END; + } + int32_t tLen = sizeof(SMqRspHead) + len; + void* tBuf = taosMemoryCalloc(1, tLen); + TQ_NULL_GO_TO_END(tBuf); + void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); + SEncoder encoder = {0}; + tEncoderInit(&encoder, metaBuff, len); + code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp); + tEncoderClear(&encoder); + + if (code < 0) { + tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + goto END; + } + TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf)); + TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen)); + +END: + return code; +} + +static int32_t buildBatchCreateContent(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){ + int32_t code = 0; + SVCreateTbBatchReq pReq = {0}; + pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq); + pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); + TQ_NULL_GO_TO_END(pReq.pArray); + for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){ + void *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i); + TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq)); + } + tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code); + if (code < 0) { + goto END; + } + len += sizeof(SMsgHead); + *pBuf = taosMemoryMalloc(*len); + TQ_NULL_GO_TO_END(pBuf); + SEncoder coder = {0}; + tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len); + code = tEncodeSVCreateTbBatchReq(&coder, &pReq); + tEncoderClear(&coder); + +END: + taosArrayDestroy(pReq.pArray); + return code; +} + +#define SEND_BATCH_META_RSP \ +tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\ +code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\ +goto END; + +#define SEND_DATA_RSP \ +tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\ +code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\ +goto END; static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int32_t vgId = TD_VID(pTq->pVnode); @@ -261,6 +337,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, uint64_t st = taosGetTimestampMs(); int totalRows = 0; int32_t totalMetaRows = 0; + int32_t totalAutoMetaRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); if (savedEpoch > pRequest->epoch) { @@ -271,15 +348,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { - if (totalMetaRows > 0) { - tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); - code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); - goto END; + if (totalMetaRows > 0 || totalAutoMetaRows > 0) { + SEND_BATCH_META_RSP } - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - POLL_RSP_TYPE(pRequest, taosxRsp), vgId); - goto END; + SEND_DATA_RSP } SWalCont* pHead = &pHandle->pWalReader->pHead->head; @@ -288,11 +360,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { + if (totalAutoMetaRows > 0){ + SEND_BATCH_META_RSP + } if (totalRows > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - POLL_RSP_TYPE(pRequest, taosxRsp), vgId); - goto END; + SEND_DATA_RSP } if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { @@ -318,53 +390,20 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); goto END; } - - if (!btMetaRsp.batchMetaReq) { - btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); - TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq); - btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); - TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen); - } + code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body); fetchVer++; - - SMqMetaRsp tmpMetaRsp = {0}; - tmpMetaRsp.resMsgType = pHead->msgType; - tmpMetaRsp.metaRspLen = pHead->bodyLen; - tmpMetaRsp.metaRsp = pHead->body; - uint32_t len = 0; - tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code); - if (TSDB_CODE_SUCCESS != code) { - tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); + if (code != 0){ continue; } - int32_t tLen = sizeof(SMqRspHead) + len; - void* tBuf = taosMemoryCalloc(1, tLen); - TQ_NULL_GO_TO_END(tBuf); - void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); - SEncoder encoder = {0}; - tEncoderInit(&encoder, metaBuff, len); - code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp); - tEncoderClear(&encoder); - - if (code < 0) { - tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); - continue; - } - TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf)); - TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen)); totalMetaRows++; if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) { - tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); - code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); - goto END; + SEND_BATCH_META_RSP } continue; } if (totalMetaRows > 0) { - tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); - code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); - goto END; + SEND_BATCH_META_RSP } // process data @@ -376,17 +415,37 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest)); + if (pHandle->fetchMeta == ONLY_META){ + int32_t len = 0; + void *pBuf = NULL; + code = buildBatchCreateContent(&taosxRsp, &pBuf, &len); + if (code == 0){ + code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf); + } + taosMemoryFree(pBuf); + taosArrayDestroyP(taosxRsp.createTableReq, NULL); + taosxRsp.createTableReq = NULL; + fetchVer++; + if (code != 0){ + continue; + } + totalAutoMetaRows ++; + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) { + SEND_BATCH_META_RSP + } + continue; + } + if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) || (taosGetTimestampMs() - st > pRequest->timeout) || (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows || terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) { -// tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d", -// (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno); - tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1); - code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - POLL_RSP_TYPE(pRequest, taosxRsp), vgId); - if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){terrno = 0;} - goto END; + if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){ + terrno = 0; + } else{ + fetchVer++; + } + SEND_DATA_RSP } else { fetchVer++; } From e594c2bbd585f49261d8ab9c43e77e1973029d72 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 21 Feb 2025 11:49:17 +0800 Subject: [PATCH 2/4] feat:[TD-33798]modify metadata logic for tmq --- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/tq/tqScan.c | 22 +- source/dnode/vnode/src/tq/tqUtil.c | 22 +- tests/parallel_test/cases.task | 1 - tests/parallel_test/cases_tdengine.task | 2 +- .../7-tmq/{tmq_ts-5473.py => tmq_c_test.py} | 8 + tests/system-test/7-tmq/tmq_taosx.py | 48 +-- tests/system-test/7-tmq/tmq_ts-5776.py | 39 --- utils/test/c/CMakeLists.txt | 8 + utils/test/c/tmq_td33798.c | 312 ++++++++++++++++++ 10 files changed, 375 insertions(+), 89 deletions(-) rename tests/system-test/7-tmq/{tmq_ts-5473.py => tmq_c_test.py} (81%) delete mode 100644 tests/system-test/7-tmq/tmq_ts-5776.py create mode 100644 utils/test/c/tmq_td33798.c diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2fd16b4f67..6369358008 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -188,7 +188,7 @@ char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; // tmq int32_t tmqMaxTopicNum = 20; -int32_t tmqRowSize = 4096; +int32_t tmqRowSize = 1000; // query int32_t tsQueryPolicy = 1; bool tsQueryTbNotExistAsEmpty = false; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 9d1b99f483..549a47d006 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -405,14 +405,16 @@ static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, } int64_t uid = pSubmitTbData->uid; - if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) { - tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid); - terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT; - return; - } else { - int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES); - if (code != 0){ - tqError("failed to add table uid to hash, code:%d, uid:%"PRId64, code, uid); + if (pRequest->rawData) { + if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) { + tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid); + terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT; + return; + } else { + int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES); + if (code != 0) { + tqError("failed to add table uid to hash, code:%d, uid:%" PRId64, code, uid); + } } } @@ -455,9 +457,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData } code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList); TSDB_CHECK_CODE(code, lino, END); - if (pRequest->rawData) { - preProcessSubmitMsg(pHandle, pRequest, &rawList); - } + preProcessSubmitMsg(pHandle, pRequest, &rawList); // data could not contains same uid data in rawdata mode if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){ goto END; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 4d908b5d57..7dc9a88164 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -266,7 +266,7 @@ END: return code; } -static int32_t buildBatchCreateContent(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){ +static int32_t buildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){ int32_t code = 0; SVCreateTbBatchReq pReq = {0}; pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq); @@ -280,7 +280,7 @@ static int32_t buildBatchCreateContent(SMqDataRsp *taosxRsp, void** pBuf, int32_ if (code < 0) { goto END; } - len += sizeof(SMsgHead); + *len += sizeof(SMsgHead); *pBuf = taosMemoryMalloc(*len); TQ_NULL_GO_TO_END(pBuf); SEncoder coder = {0}; @@ -337,7 +337,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, uint64_t st = taosGetTimestampMs(); int totalRows = 0; int32_t totalMetaRows = 0; - int32_t totalAutoMetaRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); if (savedEpoch > pRequest->epoch) { @@ -348,7 +347,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { - if (totalMetaRows > 0 || totalAutoMetaRows > 0) { + if (totalMetaRows > 0) { SEND_BATCH_META_RSP } SEND_DATA_RSP @@ -360,9 +359,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { - if (totalAutoMetaRows > 0){ - SEND_BATCH_META_RSP - } if (totalRows > 0) { SEND_DATA_RSP } @@ -402,7 +398,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, continue; } - if (totalMetaRows > 0) { + if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) { SEND_BATCH_META_RSP } @@ -415,10 +411,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest)); - if (pHandle->fetchMeta == ONLY_META){ + if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){ int32_t len = 0; void *pBuf = NULL; - code = buildBatchCreateContent(&taosxRsp, &pBuf, &len); + code = buildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len); if (code == 0){ code = buildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf); } @@ -429,8 +425,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (code != 0){ continue; } - totalAutoMetaRows ++; - if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) { + totalMetaRows++; + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || + (taosGetTimestampMs() - st > pRequest->timeout) || + (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) { SEND_BATCH_META_RSP } continue; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 0d14722aaf..b56dabe7cf 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -455,7 +455,6 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py -,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5776.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py diff --git a/tests/parallel_test/cases_tdengine.task b/tests/parallel_test/cases_tdengine.task index e52fe68957..18c50322ad 100644 --- a/tests/parallel_test/cases_tdengine.task +++ b/tests/parallel_test/cases_tdengine.task @@ -298,7 +298,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py -,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_c_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py diff --git a/tests/system-test/7-tmq/tmq_ts-5473.py b/tests/system-test/7-tmq/tmq_c_test.py similarity index 81% rename from tests/system-test/7-tmq/tmq_ts-5473.py rename to tests/system-test/7-tmq/tmq_c_test.py index ad08fa559c..a2ed4aa708 100644 --- a/tests/system-test/7-tmq/tmq_ts-5473.py +++ b/tests/system-test/7-tmq/tmq_c_test.py @@ -29,6 +29,14 @@ class TDTestCase: tdLog.info(cmdStr) os.system(cmdStr) + cmdStr = '%s/build/bin/tmq_ts5776'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + cmdStr = '%s/build/bin/tmq_td33798'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + return def stop(self): diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 845e7229a8..053b29c013 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -677,34 +677,34 @@ class TDTestCase: tdSql.execute(f'drop database d1') def run(self): - self.consume_TS_5067_Test() - self.consumeTest() - self.consume_ts_4544() - self.consume_ts_4551() - self.consume_TS_4540_Test() - self.consume_td_31283() + # self.consume_TS_5067_Test() + # self.consumeTest() + # self.consume_ts_4544() + # self.consume_ts_4551() + # self.consume_TS_4540_Test() + # self.consume_td_31283() tdSql.prepare() self.checkWal1VgroupOnlyMeta() - self.checkWal1Vgroup() - self.checkSnapshot1Vgroup() - - self.checkWal1VgroupTable() - self.checkSnapshot1VgroupTable() - - self.checkWalMultiVgroups() - self.checkWalMultiVgroupsRawData() - self.checkSnapshotMultiVgroups() - - self.checkWalMultiVgroupsWithDropTable() - - self.checkSnapshotMultiVgroupsWithDropTable() - - self.checkSnapshot1VgroupBtmeta() - self.checkSnapshot1VgroupTableBtmeta() - self.checkSnapshotMultiVgroupsBtmeta() - self.checkSnapshotMultiVgroupsWithDropTableBtmeta() + # self.checkWal1Vgroup() + # self.checkSnapshot1Vgroup() + # + # self.checkWal1VgroupTable() + # self.checkSnapshot1VgroupTable() + # + # self.checkWalMultiVgroups() + # self.checkWalMultiVgroupsRawData() + # self.checkSnapshotMultiVgroups() + # + # self.checkWalMultiVgroupsWithDropTable() + # + # self.checkSnapshotMultiVgroupsWithDropTable() + # + # self.checkSnapshot1VgroupBtmeta() + # self.checkSnapshot1VgroupTableBtmeta() + # self.checkSnapshotMultiVgroupsBtmeta() + # self.checkSnapshotMultiVgroupsWithDropTableBtmeta() def stop(self): tdSql.close() diff --git a/tests/system-test/7-tmq/tmq_ts-5776.py b/tests/system-test/7-tmq/tmq_ts-5776.py deleted file mode 100644 index 738d69701f..0000000000 --- a/tests/system-test/7-tmq/tmq_ts-5776.py +++ /dev/null @@ -1,39 +0,0 @@ - -import taos -import sys -import time -import socket -import os -import threading - -from util.log import * -from util.sql import * -from util.cases import * -from util.dnodes import * -from util.common import * -from taos.tmq import * -sys.path.append("./7-tmq") -from tmqCommon import * - -class TDTestCase: - updatecfgDict = {'debugFlag': 135, 'asynclog': 0} - def init(self, conn, logSql, replicaVar=1): - self.replicaVar = int(replicaVar) - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) - #tdSql.init(conn.cursor(), logSql) # output sql.txt file - - def run(self): - buildPath = tdCom.getBuildPath() - cmdStr = '%s/build/bin/tmq_ts5776'%(buildPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - return - - def stop(self): - tdSql.close() - tdLog.success(f"{__file__} successfully executed") - -tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index d73f91aad3..1b2716b8e5 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable(tmq_td32526 tmq_td32526.c) add_executable(tmq_td32187 tmq_td32187.c) add_executable(tmq_ts5776 tmq_ts5776.c) add_executable(tmq_td32471 tmq_td32471.c) +add_executable(tmq_td33798 tmq_td33798.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -81,6 +82,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_td33798 + PUBLIC ${TAOS_LIB} + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_td32526 PUBLIC ${TAOS_LIB} diff --git a/utils/test/c/tmq_td33798.c b/utils/test/c/tmq_td33798.c new file mode 100644 index 0000000000..a029b17b61 --- /dev/null +++ b/utils/test/c/tmq_td33798.c @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + +bool batchMeta = false; +int32_t consumeIndex = 0; +static TAOS* use_db() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return NULL; + } + + TAOS_RES* pRes = taos_query(pConn, "use db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes)); + return NULL; + } + taos_free_result(pRes); + return pConn; +} + +void checkBatchMeta(TAOS_RES* msg){ + char* result = tmq_get_json_meta(msg); + printf("meta result: %s\n", result); + switch (consumeIndex) { + case 0: + ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"zstd\",\"level\":\"medium\"}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct11\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct10\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]},{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}]}") == 0); + break; + default: + ASSERT(0); + break; + } + + tmq_free_json_meta(result); +} + +void checkNonBatchMeta(TAOS_RES* msg){ + char* result = tmq_get_json_meta(msg); + printf("meta result: %s\n", result); + switch (consumeIndex) { + case 0: + ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"super\",\"tableName\":\"st1\",\"columns\":[{\"name\":\"ts\",\"type\":9,\"isPrimarykey\":false,\"encode\":\"delta-i\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c1\",\"type\":4,\"isPrimarykey\":false,\"encode\":\"simple8b\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c2\",\"type\":6,\"isPrimarykey\":false,\"encode\":\"delta-d\",\"compress\":\"lz4\",\"level\":\"medium\"},{\"name\":\"c3\",\"type\":8,\"length\":16,\"isPrimarykey\":false,\"encode\":\"disabled\",\"compress\":\"zstd\",\"level\":\"medium\"}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}") == 0); + break; + case 1: + ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct0\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}") == 0); + break; + case 2: + ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct1\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}]}") == 0); + break; + case 3: + ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct11\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}]}") == 0); + break; + case 4: + ASSERT(strcmp(result, "{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct10\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[]}") == 0); + break; + case 5: + ASSERT(strcmp(result, "{\"tmq_meta_version\":\"1.0\",\"metas\":[{\"type\":\"create\",\"tableType\":\"child\",\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}],\"createList\":[{\"tableName\":\"ct2\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]},{\"tableName\":\"ct3\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}]}]}") == 0); + break; + default: + ASSERT(0); + break; + } + + tmq_free_json_meta(result); +} + +static void msg_process(TAOS_RES* msg) { + printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); + printf("db: %s\n", tmq_get_db_name(msg)); + printf("vg: %d\n", tmq_get_vgroup_id(msg)); + TAOS* pConn = use_db(); + ASSERT (tmq_get_res_type(msg) == TMQ_RES_TABLE_META); + if (batchMeta){ + checkBatchMeta(msg); + } else { + checkNonBatchMeta(msg); + } + taos_close(pConn); +} + +int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { + pRes = taos_query(pConn, + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct11 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct10 using st1 tags(1000, \"ttt\", true)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taosSsleep(1); + pRes = taos_query(pConn, "insert into ct1 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a') ct2 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a') ct3 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + + pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + return 0; +} + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1 wal_retention_period 3600"); + if (taos_errno(pRes) != 0) { + printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop topic if exists topic_db"); + if (taos_errno(pRes) != 0) { + printf("error in drop topic, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists abc1"); + if (taos_errno(pRes) != 0) { + printf("error in drop db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1 wal_retention_period 3600"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + buildDatabase(pConn, pRes); + + taos_close(pConn); + return 0; +} + +int32_t create_topic() { + printf("create topic\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "create topic topic_db only meta as database abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", batchMeta ? "batch" : "nonbatch"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); + if (batchMeta) { + tmq_conf_set(conf, "msg.enable.batchmeta", "1"); + } + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "topic_db"); + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000); + if (tmqmessage) { + msg_process(tmqmessage); + consumeIndex++; + taos_free_result(tmqmessage); + } else { + break; + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + if (init_env() < 0) { + return -1; + } + create_topic(); + + tmq_list_t* topic_list = build_topic_list(); + tmq_t* tmq = build_consumer(); + basic_consume_loop(tmq, topic_list); + + batchMeta = true; + consumeIndex = 0; + tmq = build_consumer(); + basic_consume_loop(tmq, topic_list); + + tmq_list_destroy(topic_list); +} From b5e673f727a8b11374e594d6ffb4ac5bf3e3aad1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 21 Feb 2025 19:05:18 +0800 Subject: [PATCH 3/4] feat:[TD-33798]modify metadata logic for tmq --- tests/parallel_test/cases.task | 1 - tests/system-test/7-tmq/tmq_taosx.py | 48 ++++++++++++++-------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6215d7d433..696693c051 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -470,7 +470,6 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py -,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 053b29c013..845e7229a8 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -677,34 +677,34 @@ class TDTestCase: tdSql.execute(f'drop database d1') def run(self): - # self.consume_TS_5067_Test() - # self.consumeTest() - # self.consume_ts_4544() - # self.consume_ts_4551() - # self.consume_TS_4540_Test() - # self.consume_td_31283() + self.consume_TS_5067_Test() + self.consumeTest() + self.consume_ts_4544() + self.consume_ts_4551() + self.consume_TS_4540_Test() + self.consume_td_31283() tdSql.prepare() self.checkWal1VgroupOnlyMeta() - # self.checkWal1Vgroup() - # self.checkSnapshot1Vgroup() - # - # self.checkWal1VgroupTable() - # self.checkSnapshot1VgroupTable() - # - # self.checkWalMultiVgroups() - # self.checkWalMultiVgroupsRawData() - # self.checkSnapshotMultiVgroups() - # - # self.checkWalMultiVgroupsWithDropTable() - # - # self.checkSnapshotMultiVgroupsWithDropTable() - # - # self.checkSnapshot1VgroupBtmeta() - # self.checkSnapshot1VgroupTableBtmeta() - # self.checkSnapshotMultiVgroupsBtmeta() - # self.checkSnapshotMultiVgroupsWithDropTableBtmeta() + self.checkWal1Vgroup() + self.checkSnapshot1Vgroup() + + self.checkWal1VgroupTable() + self.checkSnapshot1VgroupTable() + + self.checkWalMultiVgroups() + self.checkWalMultiVgroupsRawData() + self.checkSnapshotMultiVgroups() + + self.checkWalMultiVgroupsWithDropTable() + + self.checkSnapshotMultiVgroupsWithDropTable() + + self.checkSnapshot1VgroupBtmeta() + self.checkSnapshot1VgroupTableBtmeta() + self.checkSnapshotMultiVgroupsBtmeta() + self.checkSnapshotMultiVgroupsWithDropTableBtmeta() def stop(self): tdSql.close() From e87541efaadc655ae55be51ad8f1bcecff39a85d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 24 Feb 2025 16:18:27 +0800 Subject: [PATCH 4/4] feat:[TD-33798]modify metadata logic for tmq --- source/dnode/vnode/src/tq/tqUtil.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 7dc9a88164..bc8b8504c4 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -389,7 +389,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = buildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body); fetchVer++; if (code != 0){ - continue; + goto END; } totalMetaRows++; if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) { @@ -423,7 +423,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, taosxRsp.createTableReq = NULL; fetchVer++; if (code != 0){ - continue; + goto END; } totalMetaRows++; if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) ||