From 0e6486e3ec46e13b9578d333b76c28f69fd11a8e Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 15 Sep 2023 14:28:45 +0800 Subject: [PATCH 01/20] test: sql.py remove successful print and add diff case --- tests/pytest/util/sql.py | 27 +++++++++++++++++---------- tests/system-test/2-query/diff.py | 27 +++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 91aac1929f..eb64f7f316 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -257,7 +257,7 @@ class TDSql: return self.cursor.istype(col, dataType) - def checkData(self, row, col, data): + def checkData(self, row, col, data, show = False): if row >= self.queryRows: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, row+1, self.queryRows) @@ -275,8 +275,8 @@ class TDSql: if isinstance(data,str) : if (len(data) >= 28): if self.queryResult[row][col] == _parse_ns_timestamp(data): - # tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{pd.to_datetime(resultData)} == expect:{data}") - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data) @@ -284,7 +284,8 @@ class TDSql: else: if self.queryResult[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc): # tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}") - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data) @@ -317,7 +318,8 @@ class TDSql: if data == self.queryResult[row][col]: success = True if success: - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data) @@ -328,7 +330,8 @@ class TDSql: delt_data = data-datetime.datetime.fromtimestamp(0,data.tzinfo) delt_result = self.queryResult[row][col] - datetime.datetime.fromtimestamp(0,self.queryResult[row][col].tzinfo) if delt_data == delt_result: - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data) @@ -341,16 +344,19 @@ class TDSql: if str(self.queryResult[row][col]) == str(data): # tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}") - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") return elif isinstance(data, float): if abs(data) >= 1 and abs((self.queryResult[row][col] - data) / data) <= 0.000001: # tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}") - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") elif abs(data) < 1 and abs(self.queryResult[row][col] - data) <= 0.000001: # tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.queryResult[row][col]} == expect:{data}") - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") else: caller = inspect.getframeinfo(inspect.stack()[1][0]) @@ -361,7 +367,8 @@ class TDSql: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, row, col, self.queryResult[row][col], data) tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args) - tdLog.info("check successfully") + if(show): + tdLog.info("check successfully") # return true or false replace exit, no print out def checkRowColNoExit(self, row, col): diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 0a2f750f93..213fdcc32a 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -19,6 +19,30 @@ class TDTestCase: def check_result(self): for i in range(self.rowNum): tdSql.checkData(i, 0, 1); + + def full_datatype_test(self): + sql = "create table st(ts timestamp, c1 bool, c2 float, c3 double,c4 tinyint, c5 smallint, c6 int, c7 bigint, c8 tinyint unsigned, c9 smallint unsigned, c10 int unsigned, c11 bigint unsigned) tags( area int);" + tdSql.execute(sql) + + sql = "create table t1 using st tags(1);" + tdSql.execute(sql) + + ts = 1694000000000 + rows = 126 + for i in range(rows): + ts += 1 + sql = f"insert into t1 values({ts},true,{i},{i},{i%127},{i%32767},{i},{i},{i%127},{i%32767},{i},{i});" + tdSql.execute(sql) + + sql = "select diff(ts),diff(c1),diff(c3),diff(c4),diff(c5),diff(c6),diff(c7),diff(c8),diff(c9),diff(c10),diff(c11) from t1" + tdSql.query(sql) + tdSql.checkRows(rows - 1) + for i in range(rows - 1): + for j in range(10): + if j == 1: # bool + tdSql.checkData(i, j, 0) + else: + tdSql.checkData(i, j, 1) def run(self): tdSql.prepare() @@ -281,6 +305,9 @@ class TDTestCase: tdSql.checkData(0, 1, 11) tdSql.checkData(1, 1, -9) + # full type test + self.full_datatype_test() + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From bc1371a4b7a36ed07134a9ade4061fe706d884c4 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 15 Sep 2023 15:49:35 +0800 Subject: [PATCH 02/20] case: add use db --- tests/system-test/2-query/diff.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 213fdcc32a..80e0d3b10d 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -47,6 +47,10 @@ class TDTestCase: def run(self): tdSql.prepare() dbname = "db" + + # full type test + self.full_datatype_test() + tdSql.execute( f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)") tdSql.execute( @@ -305,9 +309,6 @@ class TDTestCase: tdSql.checkData(0, 1, 11) tdSql.checkData(1, 1, -9) - # full type test - self.full_datatype_test() - def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From 46a9a906e05e859bedabf336127a4eea07a8b99f Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 15 Sep 2023 17:57:54 +0800 Subject: [PATCH 03/20] case: add use db sql --- tests/system-test/2-query/diff.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 80e0d3b10d..8f3fb2c3d9 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -21,6 +21,7 @@ class TDTestCase: tdSql.checkData(i, 0, 1); def full_datatype_test(self): + tdSql.execute("use db;") sql = "create table st(ts timestamp, c1 bool, c2 float, c3 double,c4 tinyint, c5 smallint, c6 int, c7 bigint, c8 tinyint unsigned, c9 smallint unsigned, c10 int unsigned, c11 bigint unsigned) tags( area int);" tdSql.execute(sql) From 2c9d54af375a40748d1bb7335d98a3a97eb5163f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 19:58:46 +0800 Subject: [PATCH 04/20] fix(stream): fix memory leak. --- source/common/src/tglobal.c | 2 +- source/libs/stream/src/streamQueue.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1578a9ea4c..1928b3950d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -240,7 +240,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 300; +int32_t tsStreamCheckpointTickInterval = 10; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6aaea2ce24..d3d114d4aa 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -387,7 +387,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", pTask->id.idStr, total + 1, size, tstrerror(code)); } else { - qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); + qDebug("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); } return TSDB_CODE_SUCCESS; From 1ca06e960f3c2ca7e4a43a5a86cfbe88d8033361 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 20:25:24 +0800 Subject: [PATCH 05/20] fix(stream): remove stream in buf. --- source/dnode/mnode/impl/src/mndStream.c | 30 ++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f4110562a6..d2f4657aa8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -83,6 +83,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); +static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); + int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -1280,7 +1283,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } - // mndTransSetSerial(pTrans); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1304,13 +1306,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } + removeStreamTasksInBuf(pStream, &execNodeList); + char detail[100] = {0}; sprintf(detail, "igNotExists:%d", dropReq.igNotExists); SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); - //reuse this function for stream - auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail); sdbRelease(pMnode->pSdb, pStream); @@ -2238,7 +2240,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { +void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2261,6 +2263,25 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p } } +void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { + int32_t level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + if (p != NULL) { + taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p); + taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + } + } + } +} + // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -2277,7 +2298,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - // int64_t now = taosGetTimestampSec(); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execNodeList.lock); From 377fdfacf6f8f4ab27b24febddfdabd6acc94074 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 23:51:08 +0800 Subject: [PATCH 06/20] fix(stream): add some logs. --- source/libs/stream/src/streamMeta.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 31bf5a482b..def1253502 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -644,10 +644,12 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); + int32_t vgId = pMeta->vgId; + + qInfo("vgId:%d load stream tasks from meta files", vgId); if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno)); + qError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); return -1; } @@ -662,6 +664,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + qError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno)); doClear(pKey, pVal, pCur, pRecycleList); return -1; } @@ -672,9 +676,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); qError( - "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " - "manually", - tsDataDir); + "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " + "manually", vgId, tsDataDir); return -1; } tDecoderClear(&decoder); @@ -731,6 +734,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { tdbFree(pKey); tdbFree(pVal); if (tdbTbcClose(pCur) < 0) { + qError("vgId:%d failed to close meta-file cursor", vgId); taosArrayDestroy(pRecycleList); return -1; } From 9a9fffa577afd32d97069abef82104495b78d7d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 00:52:35 +0800 Subject: [PATCH 07/20] fix(stream): reset task counter. --- source/libs/stream/src/streamMeta.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index def1253502..47146f6c53 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -294,6 +294,8 @@ void streamMetaClear(SStreamMeta* pMeta) { taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->chkpSaved); taosArrayClear(pMeta->chkpInUse); + pMeta->numOfStreamTasks = 0; + pMeta->numOfPausedTasks = 0; } void streamMetaClose(SStreamMeta* pMeta) { @@ -747,6 +749,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + ASSERT(pMeta->numOfStreamTasks <= numOfTasks); qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); From e76bab7122ed7d329ca6ce395a5d58106b6e0f73 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 09:43:14 +0800 Subject: [PATCH 08/20] fix(stream): not handle the check msg for follower tasks. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tq.c | 14 +++++++++++--- source/dnode/vnode/src/tq/tqStreamTask.c | 8 +------- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamRecover.c | 19 ++++++++++++++++--- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a19ebd67b0..a55d188978 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -661,6 +661,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); +int32_t streamSetStatusUnint(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f5909eb0fe..f797feee34 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -864,6 +864,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamMeta* pMeta = pTq->pStreamMeta; SStreamTaskCheckReq req; SDecoder decoder; @@ -884,10 +885,17 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); + // only the leader node handle the check request + if (!pMeta->leader) { + tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + return -1; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", @@ -899,7 +907,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } - return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); + return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3cba4567fe..3a5eeae561 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -95,7 +95,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { } pTask->taskExecInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); + tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); streamSetStatusNormal(pTask); streamTaskCheckDownstream(pTask); @@ -111,12 +111,9 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; -// taosWLockLatch(&pMeta->lock); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); -// taosWUnLockLatch(&pMeta->lock); return 0; } @@ -124,7 +121,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); -// taosWUnLockLatch(&pMeta->lock); return -1; } @@ -135,8 +131,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); -// taosWUnLockLatch(&pMeta->lock); - return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 47146f6c53..93f4b7c4dd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -749,7 +749,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - ASSERT(pMeta->numOfStreamTasks <= numOfTasks); + ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d28ec85dd5..2689e9ee70 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -205,21 +205,22 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); + const char* id = pTask->id.idStr; if (stage == -1) { - qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id, upstreamTaskId, stage); return 0; } if (pInfo->stage == -1) { pInfo->stage = stage; - qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id, upstreamTaskId, stage); } if (pInfo->stage < stage) { qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64, - pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage); + id, upstreamTaskId, vgId, stage, pInfo->stage); } return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0; @@ -355,6 +356,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } } +int32_t streamSetStatusUnint(SStreamTask* pTask) { + int32_t status = atomic_load_8(&pTask->status.taskStatus); + if (status == TASK_STATUS__DROPPING) { + qError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr); + return -1; + } else { + qDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT); + return 0; + } +} + // source int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow); From b7efedf4aae44605d67a986d93ac0ee40c6f406c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 09:53:33 +0800 Subject: [PATCH 09/20] enh(stream): log the checkpoint time. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckpoint.c | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a55d188978..5329da2f17 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -247,6 +247,7 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { + int64_t startTs; int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version int64_t nextProcessVer; // current offset in WAL, not serialize it diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a48f74ce86..3f8b69785d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -141,6 +141,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.startTs = taosGetTimestampMs(); // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. @@ -312,15 +313,19 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); + double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; + if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); - qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, - pTask->checkpointingId); + qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId, + el, pTask->checkpointingId); } else { - qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, - pTask->id.idStr, remain, pMeta->numOfStreamTasks); + qDebug( + "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not " + "ready:%d/%d", + pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks); } // send check point response to upstream task From 5a6e50d523e83811d32154a3cbe7237fe7e0acec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 10:33:58 +0800 Subject: [PATCH 10/20] fix(stream): keep the status entry in hash table, instead entry index. --- source/dnode/mnode/impl/src/mndStream.c | 46 ++++++++++++++++--------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d2f4657aa8..3018326c74 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -125,7 +125,7 @@ int32_t mndInitStream(SMnode *pMnode) { taosThreadMutexInit(&execNodeList.lock, NULL); execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); - execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); + execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId)); return sdbSetTable(pMnode->pSdb, table); } @@ -1183,10 +1183,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execNodeList.lock); for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); - if (p->status != TASK_STATUS__NORMAL) { + STaskId *p = taosArrayGet(execNodeList.pTaskList, i); + STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", - p->id.streamId, (int32_t)p->id.taskId, 0, streamGetTaskStatusStr(p->status)); + pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); ready = false; break; } @@ -1557,13 +1562,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char status[20 + VARSTR_HEADER_SIZE] = {0}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - int32_t *index = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); - if (index == NULL) { + STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); - const char* pStatus = streamGetTaskStatusStr(pStatusEntry->status); + const char* pStatus = streamGetTaskStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -2254,10 +2258,8 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod if (p == NULL) { STaskStatusEntry entry = { .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; - taosArrayPush(pExecNode->pTaskList, &entry); - - int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; - taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &ordinal, sizeof(ordinal)); + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + taosArrayPush(pExecNode->pTaskList, &id); } } } @@ -2275,11 +2277,21 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p != NULL) { - taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p); taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + + for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId* pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == id.taskId && pId->streamId == id.streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + break; + } + } + } } } + + ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } // todo: this process should be executed by the write queue worker of the mnode @@ -2308,13 +2320,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int32_t *index = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); - if (index == NULL) { + STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); + if (pEntry == NULL) { + mError("s-task:0x%"PRIx64" not found in mnode task list", p->id.taskId); continue; } - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, *index); - pStatusEntry->status = p->status; + pEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); } From 38b3a7c1bd451c52cc20bf70bf6f004ea35edbfc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 11:05:49 +0800 Subject: [PATCH 11/20] fix(stream): update logs. --- source/libs/stream/src/streamExec.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 969b547d71..74b24fb4c3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -525,6 +525,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; + // merge multiple input data if possible in the input queue. + qDebug("s-task:%s start to extract data block from inputQ", id); + while (1) { int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; @@ -533,9 +536,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks); if (pInput == NULL) { ASSERT(numOfBlocks == 0); From b7725ac2fe09c53ec0c0a5352bc9a2915cfcb853 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 19 Sep 2023 11:48:27 +0800 Subject: [PATCH 12/20] fix: query tbname from systables memory corruption --- source/common/src/tdatablock.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 53646b84b3..aa819ae788 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2353,27 +2353,26 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList int32_t maxRows = 0; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); - // it is a reserved column for scalar function, and no data in this column yet. - if (pDst->pData == NULL) { - continue; - } + if (!pBoolList) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + // it is a reserved column for scalar function, and no data in this column yet. + if (pDst->pData == NULL) { + continue; + } - int32_t numOfRows = 0; - if (IS_VAR_DATA_TYPE(pDst->info.type)) { - pDst->varmeta.length = 0; + int32_t numOfRows = 0; + if (IS_VAR_DATA_TYPE(pDst->info.type)) { + pDst->varmeta.length = 0; + } } - } - - if (NULL == pBoolList) { return; } - + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); // it is a reserved column for scalar function, and no data in this column yet. - if (pDst->pData == NULL) { + if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) { continue; } From 309630eb11d38b367b8c7b7644a545ca4e203cca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 14:22:05 +0800 Subject: [PATCH 13/20] fix(stream): add timestamp. --- source/libs/stream/src/streamCheckpoint.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3f8b69785d..f367ba932f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -202,6 +202,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); + pTask->chkInfo.startTs = taosGetTimestampMs(); + // update the child Id for downstream tasks streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); From dfff811ca9729df079959a47853f4aa427da45f6 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 19 Sep 2023 14:47:08 +0800 Subject: [PATCH 14/20] fix: add db database on sql --- tests/system-test/2-query/diff.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index 8f3fb2c3d9..10e16a690f 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -22,20 +22,20 @@ class TDTestCase: def full_datatype_test(self): tdSql.execute("use db;") - sql = "create table st(ts timestamp, c1 bool, c2 float, c3 double,c4 tinyint, c5 smallint, c6 int, c7 bigint, c8 tinyint unsigned, c9 smallint unsigned, c10 int unsigned, c11 bigint unsigned) tags( area int);" + sql = "create table db.st(ts timestamp, c1 bool, c2 float, c3 double,c4 tinyint, c5 smallint, c6 int, c7 bigint, c8 tinyint unsigned, c9 smallint unsigned, c10 int unsigned, c11 bigint unsigned) tags( area int);" tdSql.execute(sql) - sql = "create table t1 using st tags(1);" + sql = "create table db.t1 using db.st tags(1);" tdSql.execute(sql) ts = 1694000000000 rows = 126 for i in range(rows): ts += 1 - sql = f"insert into t1 values({ts},true,{i},{i},{i%127},{i%32767},{i},{i},{i%127},{i%32767},{i},{i});" + sql = f"insert into db.t1 values({ts},true,{i},{i},{i%127},{i%32767},{i},{i},{i%127},{i%32767},{i},{i});" tdSql.execute(sql) - sql = "select diff(ts),diff(c1),diff(c3),diff(c4),diff(c5),diff(c6),diff(c7),diff(c8),diff(c9),diff(c10),diff(c11) from t1" + sql = "select diff(ts),diff(c1),diff(c3),diff(c4),diff(c5),diff(c6),diff(c7),diff(c8),diff(c9),diff(c10),diff(c11) from db.t1" tdSql.query(sql) tdSql.checkRows(rows - 1) for i in range(rows - 1): From 1decaaee1e8bd3ff0ff584f68844b3c1b1ef3d81 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 16:49:04 +0800 Subject: [PATCH 15/20] fix(stream): set correct size for results generated by scan history stream tasks. --- source/libs/stream/src/streamData.c | 4 +++ source/libs/stream/src/streamExec.c | 38 +++++++++++++---------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 00bf631d74..a108667f5d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; pStreamBlocks->blocks = pRes; + if (pItem == NULL) { + return pStreamBlocks; + } + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; pStreamBlocks->sourceVer = pSubmit->ver; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 74b24fb4c3..d89817d236 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,9 +16,10 @@ #include "streamInt.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 -#define STREAM_RESULT_DUMP_THRESHOLD 100 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 4 +#define STREAM_RESULT_DUMP_THRESHOLD 100 +#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); @@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position - //code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY destroyStreamDataBlock(pStreamBlocks); return code; } @@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); // current output should be dispatched to down stream nodes - if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) { + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); if (code != TSDB_CODE_SUCCESS) { @@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + int32_t size = 0; int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; @@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - if ((++numOfBlocks) >= outputBatchSize) { - qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize); + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + + if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, + outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); break; } } if (taosArrayGetSize(pRes) > 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - - code = doOutputResultBlockImpl(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); + code = doOutputResultBlockImpl(pTask, pStreamBlocks); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock(pStreamBlocks); return code; } + + size = 0; } else { taosArrayDestroy(pRes); } From 271e492188008a682a4c1cf546a99558ad14c4c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 17:40:26 +0800 Subject: [PATCH 16/20] refactor: update the log level. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index e0bae18545..da7ac20600 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); void* colData = colDataGetData(pColData, j); - tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); + tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); } if (IS_SET_NULL(pCol)) { From b99232fd7a9f904c4012f8ee7792ca2c4828fd9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 17:45:28 +0800 Subject: [PATCH 17/20] fix(stream): limit the max scan times. --- source/dnode/vnode/src/tq/tqStreamTask.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3a5eeae561..854478f41e 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -16,6 +16,8 @@ #include "tq.h" #include "vnd.h" +#define MAX_REPEAT_SCAN_THRESHOLD 3 + static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); @@ -153,6 +155,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { } pMeta->walScanCounter += 1; + if (pMeta->walScanCounter > MAX_REPEAT_SCAN_THRESHOLD) { + pMeta->walScanCounter = MAX_REPEAT_SCAN_THRESHOLD; + } if (pMeta->walScanCounter > 1) { tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); From 30c24eb8c62134d060b95f8dba3b891689add199 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 19 Sep 2023 18:53:12 +0800 Subject: [PATCH 18/20] fix: sometime meta-ver file lost --- tests/system-test/0-others/walRetention.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/system-test/0-others/walRetention.py b/tests/system-test/0-others/walRetention.py index 0fdeb84a5b..53316fc88b 100644 --- a/tests/system-test/0-others/walRetention.py +++ b/tests/system-test/0-others/walRetention.py @@ -109,11 +109,14 @@ class VNode : # load config tdLog.info(f' meta-ver file={metaFile}') if metaFile != "": - jsonVer = jsonFromFile(metaFile) - metaNode = jsonVer["meta"] - self.snapVer = int(metaNode["snapshotVer"]) - self.firstVer = int(metaNode["firstVer"]) - self.lastVer = int(metaNode["lastVer"]) + try: + jsonVer = jsonFromFile(metaFile) + metaNode = jsonVer["meta"] + self.snapVer = int(metaNode["snapshotVer"]) + self.firstVer = int(metaNode["firstVer"]) + self.lastVer = int(metaNode["lastVer"]) + except Exception as e: + tdLog.info(f' read json file except.') # sort with startVer self.walFiles = sorted(self.walFiles, key=lambda x : x.startVer, reverse=True) From 2d5a4efc2e2f1f20a9d684d3c0a977529a8d98c4 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Wed, 20 Sep 2023 08:01:27 +0800 Subject: [PATCH 19/20] Update 20-keywords.md --- docs/zh/12-taos-sql/20-keywords.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/zh/12-taos-sql/20-keywords.md b/docs/zh/12-taos-sql/20-keywords.md index e7e926d0b7..f59eda1689 100644 --- a/docs/zh/12-taos-sql/20-keywords.md +++ b/docs/zh/12-taos-sql/20-keywords.md @@ -180,6 +180,7 @@ description: TDengine 保留关键字的详细列表 - MAX_DELAY - BWLIMIT - MAXROWS +- MAX_SPEED - MERGE - META - MINROWS From 6e4622496654ff75a1149e3f4bbd4c40c2fbade7 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Wed, 20 Sep 2023 08:01:58 +0800 Subject: [PATCH 20/20] Update 20-keywords.md --- docs/en/12-taos-sql/20-keywords.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/en/12-taos-sql/20-keywords.md b/docs/en/12-taos-sql/20-keywords.md index 983d4f63c9..36cbc0948f 100644 --- a/docs/en/12-taos-sql/20-keywords.md +++ b/docs/en/12-taos-sql/20-keywords.md @@ -180,6 +180,7 @@ The following list shows all reserved keywords: - MAX_DELAY - BWLIMIT - MAXROWS +- MAX_SPEED - MERGE - META - MINROWS