diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index e66ffe5521..3433f98d38 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -223,10 +223,10 @@ typedef struct SStoreTqReader { bool (*tqReaderCurrentBlockConsumed)(); struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it - int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it +// int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it int32_t (*tqReaderSetSubmitMsg)(); // todo remove it - bool (*tqReaderNextBlockFilterOut)(); +// bool (*tqReaderNextBlockFilterOut)(); } SStoreTqReader; typedef struct SStoreSnapshotFn { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 7a737fbb5d..110cac152d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -368,24 +368,11 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con } } -// todo ignore the error in wal? bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { SWalReader* pWalReader = pReader->pWalReader; - SSDataBlock* pDataBlock = NULL; uint64_t st = taosGetTimestampMs(); while (1) { - // try next message in wal file - if (walNextValidMsg(pWalReader) < 0) { - return false; - } - - void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pWalReader->pHead->head.version; - - tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); - pReader->nextBlk = 0; int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen, @@ -400,33 +387,32 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid); SSDataBlock* pRes = NULL; int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL); - if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) { - if (pDataBlock == NULL) { - pDataBlock = createOneDataBlock(pRes, true); - } else { - blockDataMerge(pDataBlock, pRes); - } + if (code == TSDB_CODE_SUCCESS) { + return true; } } else { pReader->nextBlk += 1; tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); } } + tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->msg.msgStr = NULL; - if (pDataBlock != NULL) { - blockDataCleanup(pReader->pResBlock); - copyDataBlock(pReader->pResBlock, pDataBlock); - blockDataDestroy(pDataBlock); - return true; - } else { - qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id); - } - if (taosGetTimestampMs() - st > 1000) { return false; } + + // try next message in wal file + if (walNextValidMsg(pWalReader) < 0) { + return false; + } + + void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pWalReader->pHead->head.version; + tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); + pReader->nextBlk = 0; } } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index aff7e4642b..16abe69def 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -128,12 +128,12 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed; pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it - pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it +// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it pTq->tqGetResultBlock = tqGetResultBlock; - pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; +// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; pTq->tqGetResultBlockTime = tqGetResultBlockTime; pTq->tqGetStreamExecProgress = tqGetStreamExecInfo; diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index c547385d70..a4508bbb9f 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -359,8 +359,46 @@ class TDTestCase: finally: consumer.close() + def consume_TS_4540_Test(self): + tdSql.execute(f'create database if not exists test') + tdSql.execute(f'use test') + tdSql.execute(f'CREATE STABLE `test`.`b` ( `time` TIMESTAMP , `task_id` NCHAR(1000) ) TAGS( `key` NCHAR(1000))') + tdSql.execute(f"insert into `test`.b1 using `test`.`b`(key) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(key) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(key) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')") + + tdSql.execute(f'create topic tt as select tbname,task_id,key from b') + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["tt"]) + except TmqError: + tdLog.exit(f"subscribe error") + + try: + while True: + res = consumer.poll(1) + if not res: + break + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + print(data) + if data != [('b1', '32', '1')] and data != [('b2', '43', '2')] and data != [('b3', '123456', '3')]: + tdLog.exit(f"index = 0 table b1 error") + + finally: + consumer.close() + def run(self): self.consumeTest() + self.consume_TS_4540_Test() tdSql.prepare() self.checkWal1VgroupOnlyMeta()