From 13e14d0f55b52b5ccdacd94d1d072c8c5ab356ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Apr 2024 10:48:22 +0800 Subject: [PATCH 1/4] fix(stream): add return. --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d55382be83..93ede2707b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -591,6 +591,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (ret == EXEC_AFTER_IDLE) { ASSERT(pInput == NULL && numOfBlocks == 0); setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + return 0; } else { if (pInput == NULL) { ASSERT(numOfBlocks == 0); From 69cb630e7fc6d1f79f184440f75b08a365ff518e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Apr 2024 11:32:48 +0800 Subject: [PATCH 2/4] fix(tsdb): update the compare func. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 76ec4da24c..560c2750c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -121,11 +121,7 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2) { return ret > 0 ? 1 : -1; } } else { - if (p1->pks[0].val == p2->pks[0].val) { - return 0; - } else { - return tValueCompare(&p1->pks[0], &p2->pks[0]); - } + return tValueCompare(&p1->pks[0], &p2->pks[0]); } } } From 2ce7d18e94c53f20cee9f3d73ab7247ed0a1a7b1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Apr 2024 11:35:03 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/common/src/tdatablock.c | 6 ------ source/dnode/vnode/src/tq/tqPush.c | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cfb7a36480..09e13939a4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1524,8 +1524,6 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { - uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock); - uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock); taosMemoryFreeClear(pBlock->info.pks[0].pData); taosMemoryFreeClear(pBlock->info.pks[1].pData); } @@ -1705,10 +1703,6 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); p->nData = pDataBlock->info.pks[1].nData; memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); - uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData); - uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData); - } else { - uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock); } if (copyData) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 9a38776386..71e6771370 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -45,7 +45,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { // 1. the vnode has already been restored. // 2. the vnode should be the leader. // 3. the stream is not suspended yet. - if ((!tsDisableStream) && (numOfTasks > 0) /* && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)*/) { + if ((!tsDisableStream) && (numOfTasks > 0)) { tqScanWalAsync(pTq, true); } From 34b718bf4dded4d11c48ac56d246138d595ddcc4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Apr 2024 13:51:49 +0800 Subject: [PATCH 4/4] fix(test): update test cases. --- tests/system-test/8-stream/stream_basic.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index ff16bee787..c32f9a3166 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -130,6 +130,8 @@ class TDTestCase: sql = "select count(*) from sta" # loop wait max 60s to check count is ok tdLog.info("loop wait result ...") + time.sleep(5) + tdSql.checkDataLoop(0, 0, 100000, sql, loopCount=120, waitTime=0.5) time.sleep(5)