From 03bc2d7c6ddb1272c8067747c542bec547b77847 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 20 Feb 2024 10:27:02 +0800 Subject: [PATCH 01/14] test(stream): add unit test cases. --- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStreamHb.c | 4 +- source/dnode/mnode/impl/src/mndStreamTrans.c | 34 ++++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 + .../dnode/mnode/impl/test/stream/stream.cpp | 114 +++++++++++++++--- 5 files changed, 121 insertions(+), 34 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 1084340dc2..aed49809dd 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj * int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 5de442951c..3f3a6edf35 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -65,7 +65,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI taosArrayPush(pList, pInfo); } -int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); if (pTrans == NULL) { return terrno; @@ -115,7 +115,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in } else { mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, pStream->uid, transId); - code = createStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream); } } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 5bfd3933b5..2241d93465 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg return mndTransAppendRedoAction(pTrans, &action); } +static bool identicalName(const char* pDb, const char* pParam, int32_t len) { + return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0); +} + int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { - // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); - if (pTransInfo == NULL) { - return TSDB_CODE_SUCCESS; - } + void *pIter = NULL; - // not checkpoint trans, ignore - if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { - mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); - return TSDB_CODE_SUCCESS; - } + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; + if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { + continue; + } - char *pDupDBName = strndup(pDBName, len); - mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); - taosMemoryFree(pDupDBName); + SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId); + if (pStream != NULL) { + if (identicalName(pStream->sourceDb, pDBName, len)) { + mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb); + } else if (identicalName(pStream->targetDb, pDBName, len)) { + mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb); + } + + mndReleaseStream(pMnode, pStream); + } + } return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 3cabce2201..1ae85a2cc6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT taosMemoryFree(pReq); return -1; } + + mDebug("set the resume action for trans:%d", pTrans->id); return 0; } diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index e3bfdb5d6c..369053c7bb 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -29,6 +29,9 @@ #include "../../inc/mndStream.h" namespace { + +static int64_t defStreamId = 999; + SRpcMsg buildHbReq() { SStreamHbMsg msg = {0}; msg.vgId = 1; @@ -40,7 +43,7 @@ SRpcMsg buildHbReq() { entry.nodeId = i + 1; entry.stage = 1; entry.id.taskId = i + 1; - entry.id.streamId = 999; + entry.id.streamId = defStreamId; if (i == 0) { entry.stage = 4; @@ -57,7 +60,7 @@ SRpcMsg buildHbReq() { entry.stage = 1; entry.id.taskId = 5; - entry.id.streamId = 999; + entry.id.streamId = defStreamId; entry.checkpointId = 1; entry.checkpointFailed = true; @@ -118,15 +121,16 @@ void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskI taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); taosArrayPush(pExecNode->pTaskList, &id); } + void initStreamExecInfo() { SStreamExecInfo* pExecNode = &execInfo; SStreamTask task = {0}; - setTask(&task, 1, 999, 1); - setTask(&task, 1, 999, 2); - setTask(&task, 1, 999, 3); - setTask(&task, 1, 999, 4); - setTask(&task, 2, 999, 5); + setTask(&task, 1, defStreamId, 1); + setTask(&task, 1, defStreamId, 2); + setTask(&task, 1, defStreamId, 3); + setTask(&task, 1, defStreamId, 4); + setTask(&task, 2, defStreamId, 5); } void initNodeInfo() { @@ -138,38 +142,109 @@ void initNodeInfo() { } } // namespace +class StreamTest : public testing::Test { // 继承了 testing::Test + protected: + + static void SetUpTestSuite() { + mndInitExecInfo(); + initStreamExecInfo(); + initNodeInfo(); + + std::cout<<"setup env for streamTest suite"<(taosMemoryCalloc(1, sizeof(SMnode))); + {// init sdb + SSdbOpt opt = {0}; + opt.path = pMnode->path; + opt.pMnode = pMnode; + opt.pWal = pMnode->pWal; + + pMnode->pSdb = sdbInit(&opt); + } + + SVgroupChangeInfo info; + info.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)); + info.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + + const char* pDbName = "test_db_name"; + int32_t len = strlen(pDbName); + + taosHashPut(info.pDBMap, pDbName, len, NULL, 0); + + killAllCheckpointTrans(pMnode, &info); + + SStreamObj stream; + memset(&stream, 0, sizeof(SStreamObj)); + + stream.uid = defStreamId; + stream.lock = 0; + stream.tasks = taosArrayInit(1, POINTER_BYTES); + stream.pHTasksList = taosArrayInit(1, POINTER_BYTES); + + SArray* pLevel = taosArrayInit(1, POINTER_BYTES); + SStreamTask* pTask = static_cast(taosMemoryCalloc(1, sizeof(SStreamTask))); + pTask->id.streamId = defStreamId; + pTask->id.taskId = 1; + pTask->exec.qmsg = (char*)taosMemoryMalloc(1); + + taosArrayPush(pLevel, &pTask); + + taosArrayPush(stream.tasks, &pLevel); + mndCreateStreamResetStatusTrans(pMnode, &stream); + + tFreeStreamObj(&stream); + sdbCleanup(pMnode->pSdb); + taosMemoryFree(pMnode); + + taosArrayDestroy(info.pUpdateNodeList); + taosHashCleanup(info.pDBMap); +} + +TEST_F(StreamTest, plan_Test) { char* ast = "{\"NodeType\":\"101\",\"Name\":\"SelectStmt\",\"SelectStmt\":{\"Distinct\":false,\"Projections\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_1\",\"UserAlias\":\"_wstart\",\"Name\":\"_wstart\",\"Id\":\"89\",\"Type\":\"3505\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_2\",\"UserAlias\":\"sum(voltage)\",\"Name\":\"sum\",\"Id\":\"1\",\"Type\":\"14\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"voltage\",\"UserAlias\":\"voltage\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"voltage\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"groupid\",\"Name\":\"_group_key\",\"Id\":\"96\",\"Type\":\"3754\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"groupid\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"2\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"groupid\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}}],\"From\":{\"NodeType\":\"6\",\"Name\":\"RealTable\",\"RealTable\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"0\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"DbName\":\"test\",\"tableName\":\"meters\",\"tableAlias\":\"meters\",\"MetaSize\":\"475\",\"Meta\":{\"VgId\":\"0\",\"TableType\":\"1\",\"Uid\":\"6555383776122680534\",\"Suid\":\"6555383776122680534\",\"Sversion\":\"1\",\"Tversion\":\"1\",\"ComInfo\":{\"NumOfTags\":\"2\",\"Precision\":\"0\",\"NumOfColumns\":\"4\",\"RowSize\":\"20\"},\"ColSchemas\":[{\"Type\":\"9\",\"ColId\":\"1\",\"bytes\":\"8\",\"Name\":\"ts\"},{\"Type\":\"6\",\"ColId\":\"2\",\"bytes\":\"4\",\"Name\":\"current\"},{\"Type\":\"4\",\"ColId\":\"3\",\"bytes\":\"4\",\"Name\":\"voltage\"},{\"Type\":\"6\",\"ColId\":\"4\",\"bytes\":\"4\",\"Name\":\"phase\"},{\"Type\":\"4\",\"ColId\":\"5\",\"bytes\":\"4\",\"Name\":\"groupid\"},{\"Type\":\"8\",\"ColId\":\"6\",\"bytes\":\"26\",\"Name\":\"location\"}]},\"VgroupsInfoSize\":\"1340\",\"VgroupsInfo\":{\"Num\":\"2\",\"Vgroups\":[{\"VgId\":\"2\",\"HashBegin\":\"0\",\"HashEnd\":\"2147483646\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"},{\"VgId\":\"3\",\"HashBegin\":\"2147483647\",\"HashEnd\":\"4294967295\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"}]}}},\"PartitionBy\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"groupid\",\"UserAlias\":\"groupid\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"2\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"groupid\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"Window\":{\"NodeType\":\"14\",\"Name\":\"IntervalWindow\",\"IntervalWindow\":{\"Interval\":{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"115\",\"Bytes\":\"8\"},\"AliasName\":\"c804c3a15ebe05b5baf40ad5ee12be1f\",\"UserAlias\":\"2s\",\"LiteralSize\":\"2\",\"Literal\":\"2s\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"115\",\"Datum\":\"2000\"}},\"TsPk\":{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}}},\"StmtName\":\"0x1580095ba\",\"HasAggFuncs\":true}}"; // char* ast = "{\"NodeType\":\"101\",\"Name\":\"SelectStmt\",\"SelectStmt\":{\"Distinct\":false,\"Projections\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_1\",\"UserAlias\":\"wstart\",\"Name\":\"_wstart\",\"Id\":\"89\",\"Type\":\"3505\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"#expr_2\",\"UserAlias\":\"min(c1)\",\"Name\":\"min\",\"Id\":\"2\",\"Type\":\"8\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"c1\",\"UserAlias\":\"c1\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"2\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c1\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"3\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"max(c2)\",\"Name\":\"max\",\"Id\":\"3\",\"Type\":\"7\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"3\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"c2\",\"UserAlias\":\"c2\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c2\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_4\",\"UserAlias\":\"sum(c3)\",\"Name\":\"sum\",\"Id\":\"1\",\"Type\":\"14\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"c3\",\"UserAlias\":\"c3\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"4\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c3\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_5\",\"UserAlias\":\"first(c4)\",\"Name\":\"first\",\"Id\":\"33\",\"Type\":\"504\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c4\",\"UserAlias\":\"c4\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c4\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}},{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"11\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"#expr_6\",\"UserAlias\":\"last(c5)\",\"Name\":\"last\",\"Id\":\"36\",\"Type\":\"506\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"11\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"c5\",\"UserAlias\":\"c5\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"6\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c5\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}},{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"12\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_7\",\"UserAlias\":\"apercentile(c6, 50)\",\"Name\":\"apercentile\",\"Id\":\"12\",\"Type\":\"1\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"12\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"c6\",\"UserAlias\":\"c6\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"7\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c6\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c0c7c76d30bd3dcaefc96f40275bdc0a\",\"UserAlias\":\"50\",\"LiteralSize\":\"2\",\"Literal\":\"50\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"50\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"13\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_8\",\"UserAlias\":\"avg(c7)\",\"Name\":\"avg\",\"Id\":\"8\",\"Type\":\"2\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"13\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"c7\",\"UserAlias\":\"c7\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"8\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c7\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"14\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_9\",\"UserAlias\":\"count(c8)\",\"Name\":\"count\",\"Id\":\"0\",\"Type\":\"3\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"14\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c8\",\"UserAlias\":\"c8\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"9\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c8\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"6\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_10\",\"UserAlias\":\"spread(c1)\",\"Name\":\"spread\",\"Id\":\"17\",\"Type\":\"11\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"c1\",\"UserAlias\":\"c1\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"2\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c1\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_11\",\"UserAlias\":\"stddev(c2)\",\"Name\":\"stddev\",\"Id\":\"4\",\"Type\":\"12\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"3\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"c2\",\"UserAlias\":\"c2\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c2\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"8\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_12\",\"UserAlias\":\"hyperloglog(c11)\",\"Name\":\"hyperloglog\",\"Id\":\"43\",\"Type\":\"17\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"8\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c11\",\"UserAlias\":\"c11\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"12\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c11\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"10\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"26\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_13\",\"UserAlias\":\"timediff(1, 0, 1h)\",\"Name\":\"timediff\",\"Id\":\"81\",\"Type\":\"2501\",\"Parameters\":[{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c4ca4238a0b923820dcc509a6f75849b\",\"UserAlias\":\"1\",\"LiteralSize\":\"1\",\"Literal\":\"1\",\"Duration\":false,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"1\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"cfcd208495d565ef66e7dff9f98764da\",\"UserAlias\":\"0\",\"LiteralSize\":\"1\",\"Literal\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"104\",\"Bytes\":\"8\"},\"AliasName\":\"7c68645d71b803bf0ba2f22519f73e08\",\"UserAlias\":\"1h\",\"LiteralSize\":\"2\",\"Literal\":\"1h\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"104\",\"Datum\":\"3600000\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"1\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"8\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"96\"},\"AliasName\":\"#expr_14\",\"UserAlias\":\"timezone()\",\"Name\":\"timezone\",\"Id\":\"84\",\"Type\":\"2503\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}}],\"From\":{\"NodeType\":\"6\",\"Name\":\"RealTable\",\"RealTable\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"0\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"DbName\":\"test\",\"tableName\":\"at_once_interval_ext_stb\",\"tableAlias\":\"at_once_interval_ext_stb\",\"MetaSize\":\"2008\",\"Meta\":{\"VgId\":\"0\",\"TableType\":\"1\",\"Uid\":\"5129202035162885657\",\"Suid\":\"5129202035162885657\",\"Sversion\":\"1\",\"Tversion\":\"1\",\"ComInfo\":{\"NumOfTags\":\"13\",\"Precision\":\"0\",\"NumOfColumns\":\"14\",\"RowSize\":\"85\"},\"ColSchemas\":[{\"Type\":\"9\",\"ColId\":\"1\",\"bytes\":\"8\",\"Name\":\"ts\"},{\"Type\":\"2\",\"ColId\":\"2\",\"bytes\":\"1\",\"Name\":\"c1\"},{\"Type\":\"3\",\"ColId\":\"3\",\"bytes\":\"2\",\"Name\":\"c2\"},{\"Type\":\"4\",\"ColId\":\"4\",\"bytes\":\"4\",\"Name\":\"c3\"},{\"Type\":\"5\",\"ColId\":\"5\",\"bytes\":\"8\",\"Name\":\"c4\"},{\"Type\":\"11\",\"ColId\":\"6\",\"bytes\":\"1\",\"Name\":\"c5\"},{\"Type\":\"12\",\"ColId\":\"7\",\"bytes\":\"2\",\"Name\":\"c6\"},{\"Type\":\"13\",\"ColId\":\"8\",\"bytes\":\"4\",\"Name\":\"c7\"},{\"Type\":\"14\",\"ColId\":\"9\",\"bytes\":\"8\",\"Name\":\"c8\"},{\"Type\":\"6\",\"ColId\":\"10\",\"bytes\":\"4\",\"Name\":\"c9\"},{\"Type\":\"7\",\"ColId\":\"11\",\"bytes\":\"8\",\"Name\":\"c10\"},{\"Type\":\"8\",\"ColId\":\"12\",\"bytes\":\"8\",\"Name\":\"c11\"},{\"Type\":\"10\",\"ColId\":\"13\",\"bytes\":\"26\",\"Name\":\"c12\"},{\"Type\":\"1\",\"ColId\":\"14\",\"bytes\":\"1\",\"Name\":\"c13\"},{\"Type\":\"2\",\"ColId\":\"15\",\"bytes\":\"1\",\"Name\":\"t1\"},{\"Type\":\"3\",\"ColId\":\"16\",\"bytes\":\"2\",\"Name\":\"t2\"},{\"Type\":\"4\",\"ColId\":\"17\",\"bytes\":\"4\",\"Name\":\"t3\"},{\"Type\":\"5\",\"ColId\":\"18\",\"bytes\":\"8\",\"Name\":\"t4\"},{\"Type\":\"11\",\"ColId\":\"19\",\"bytes\":\"1\",\"Name\":\"t5\"},{\"Type\":\"12\",\"ColId\":\"20\",\"bytes\":\"2\",\"Name\":\"t6\"},{\"Type\":\"13\",\"ColId\":\"21\",\"bytes\":\"4\",\"Name\":\"t7\"},{\"Type\":\"14\",\"ColId\":\"22\",\"bytes\":\"8\",\"Name\":\"t8\"},{\"Type\":\"6\",\"ColId\":\"23\",\"bytes\":\"4\",\"Name\":\"t9\"},{\"Type\":\"7\",\"ColId\":\"24\",\"bytes\":\"8\",\"Name\":\"t10\"},{\"Type\":\"8\",\"ColId\":\"25\",\"bytes\":\"8\",\"Name\":\"t11\"},{\"Type\":\"10\",\"ColId\":\"26\",\"bytes\":\"26\",\"Name\":\"t12\"},{\"Type\":\"1\",\"ColId\":\"27\",\"bytes\":\"1\",\"Name\":\"t13\"}]},\"VgroupsInfoSize\":\"1340\",\"VgroupsInfo\":{\"Num\":\"2\",\"Vgroups\":[{\"VgId\":\"14\",\"HashBegin\":\"0\",\"HashEnd\":\"2147483646\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"},{\"VgId\":\"15\",\"HashBegin\":\"2147483647\",\"HashEnd\":\"4294967295\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"}]}}},\"Tags\":[{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}}],\"Window\":{\"NodeType\":\"14\",\"Name\":\"IntervalWindow\",\"IntervalWindow\":{\"Interval\":{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"115\",\"Bytes\":\"8\"},\"AliasName\":\"1fd7635317edfeca9054894ac9ef9b5e\",\"UserAlias\":\"14s\",\"LiteralSize\":\"3\",\"Literal\":\"14s\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"115\",\"Datum\":\"14000\"}},\"TsPk\":{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}}},\"StmtName\":\"0x150146d14\",\"HasAggFuncs\":true}}"; SNode * pAst = NULL; SQueryPlan *pPlan = NULL; if (taosCreateLog("taoslog", 10, "/etc/taos", NULL, NULL, NULL, NULL, 1) != 0) { - // ignore create log failed, only print - printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); + // ignore create log failed, only print + printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); } if (nodesStringToNode(ast, &pAst) < 0) { - ASSERT(0); + ASSERT(0); } - SPlanContext cxt = { 0 }; + SPlanContext cxt = {0}; cxt.pAstRoot = pAst; cxt.topicQuery = false; cxt.streamQuery = true; @@ -181,10 +256,11 @@ TEST(testCase, plan_Test) { // using ast and param to build physical plan if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - ASSERT(0); + ASSERT(0); } + if (pAst != NULL) nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); + nodesDestroyNode((SNode*)pPlan); } #pragma GCC diagnostic pop \ No newline at end of file From b3c4fe489ff398ab188241c7356b44ac120234bb Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 20 Feb 2024 13:22:35 +0800 Subject: [PATCH 02/14] feat: remove todo --- source/libs/executor/src/scanoperator.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dc9fceee84..cb83cb7599 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3317,12 +3317,6 @@ _error: // table merge scan operator // table merge scan operator -// TODO: limit / duration optimization -// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish -// TODO: error processing, memory freeing -// TODO: add log for error and perf -// TODO: tsdb reader open/close dynamically -// TODO: blockdata deep cleanup static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) { int32_t left = *(int32_t*)pLeft; From e91586c484a305992ab1c8dcead539ce2fa3a485 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 21 Feb 2024 09:47:42 +0800 Subject: [PATCH 03/14] fix: free fd to reuse the space --- source/os/src/osFile.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index bab9ba0cea..e6491639dc 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -1329,7 +1329,6 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { char *data = taosMemoryMalloc(compressSize); gzFile dstFp = NULL; - TdFilePtr pFile = NULL; TdFilePtr pSrcFile = NULL; pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); @@ -1369,8 +1368,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { } cmp_end: - if (pFile) { - taosCloseFile(&pFile); + if (fd >= 0) { + close(fd); } if (pSrcFile) { taosCloseFile(&pSrcFile); From 6b331594d9ff7151b73b8d9cdb6153765f120f5f Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 21 Feb 2024 13:38:49 +0800 Subject: [PATCH 04/14] fix: change typo error --- source/libs/nodes/src/nodesCodeFuncs.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index a9907295b9..689886c366 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -698,7 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanGroupTags = "GroupTags"; static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited"; -static const char* jkScanLogicPlanparaTablesSort = "paraTablesSort"; +static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -747,7 +747,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->paraTablesSort); + code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort); } return code; } @@ -800,7 +800,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->paraTablesSort); + code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort); } return code; } @@ -1895,7 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited"; static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable"; -static const char* jkTableScanPhysiPlanparaTablesSort = "paraTablesSort"; +static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1971,7 +1971,7 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanparaTablesSort, pNode->paraTablesSort); + code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort); } return code; } @@ -2050,7 +2050,7 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanparaTablesSort, &pNode->paraTablesSort); + code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort); } return code; } From 35aed7653955340e9457b47e09e60753a944d5dd Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 21 Feb 2024 13:39:15 +0800 Subject: [PATCH 05/14] docs: add para_tables_sort() hint --- docs/en/12-taos-sql/06-select.md | 6 ++++-- docs/zh/12-taos-sql/06-select.md | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 1fc6fb7e67..0cd2d06989 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */ hint: - BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP + BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT select_list: select_expr [, select_expr] ... @@ -87,12 +87,13 @@ Hints are a means of user control over query optimization for individual stateme The list of currently supported Hints is as follows: -| **Hint** | **Params** | **Comment** | **Scopt** | +| **Hint** | **Params** | **Comment** | **Scope** | | :-----------: | -------------- | -------------------------- | -----------------------------------| | BATCH_SCAN | None | Batch table scan | JOIN statment for stable | | NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable | | SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list | | PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list | +| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used | Sorting the supertable rows by timestamp | For example: @@ -100,6 +101,7 @@ For example: SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; +SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts; ``` ## Lists diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index 2a7dff6f7d..43b32efe32 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */ hint: - BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP + BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT select_list: select_expr [, select_expr] ... @@ -93,13 +93,14 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适 | NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 | | SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 | | PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 | - +| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存 | 超级表的数据按时间戳排序时 | 举例: ```sql SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; +SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts; ``` ## 列表 From 705ae7d8274be5e74314ca4ad8ef70a4a54473e7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 09:04:57 +0800 Subject: [PATCH 06/14] test(stream): init mnode lock. --- source/dnode/mnode/impl/test/stream/stream.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 369053c7bb..1e163d1c8e 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -189,6 +189,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { opt.pWal = pMnode->pWal; pMnode->pSdb = sdbInit(&opt); + taosThreadMutexInit(&pMnode->syncMgmt.lock, NULL); } SVgroupChangeInfo info; From 4c258d7187c9e54b23ccd4a221f3e122f8d59d5b Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 22 Feb 2024 09:27:53 +0800 Subject: [PATCH 07/14] docs: add implication of para_tables_sort --- docs/en/12-taos-sql/06-select.md | 2 +- docs/zh/12-taos-sql/06-select.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 0cd2d06989..a2e6bca46c 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -93,7 +93,7 @@ The list of currently supported Hints is as follows: | NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable | | SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list | | PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list | -| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used | Sorting the supertable rows by timestamp | +| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp | For example: diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index 43b32efe32..eec947ea23 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -93,7 +93,7 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适 | NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 | | SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 | | PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 | -| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存 | 超级表的数据按时间戳排序时 | +| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 | 举例: ```sql From 5c817d101a6ef510788fac05d1a181da01e18d13 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 09:29:40 +0800 Subject: [PATCH 08/14] test(stream): add print info. --- source/dnode/mnode/impl/test/stream/stream.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 1e163d1c8e..c4449f613b 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -222,6 +222,8 @@ TEST_F(StreamTest, kill_checkpoint_trans) { taosArrayPush(stream.tasks, &pLevel); mndCreateStreamResetStatusTrans(pMnode, &stream); + std::cout<<"finish create reset status trans"<< std::endl; + tFreeStreamObj(&stream); sdbCleanup(pMnode->pSdb); taosMemoryFree(pMnode); From 637e6043682a6797d2eb4ea1051df72a4c035043 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 09:38:44 +0800 Subject: [PATCH 09/14] test(stream): add print info. --- source/dnode/mnode/impl/test/stream/stream.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index c4449f613b..fdc108cbe3 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -222,14 +222,21 @@ TEST_F(StreamTest, kill_checkpoint_trans) { taosArrayPush(stream.tasks, &pLevel); mndCreateStreamResetStatusTrans(pMnode, &stream); - std::cout<<"finish create reset status trans"<< std::endl; + std::cout << "finish create reset status trans" << std::endl; tFreeStreamObj(&stream); + + std::cout << "end1" << std::endl; sdbCleanup(pMnode->pSdb); + + std::cout << "end2" << std::endl; + taosMemoryFree(pMnode); taosArrayDestroy(info.pUpdateNodeList); taosHashCleanup(info.pDBMap); + + std::cout << "end3" << std::endl; } TEST_F(StreamTest, plan_Test) { From 045a30035f28c9ed5a638e44ba4b22adcaf51638 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 09:52:47 +0800 Subject: [PATCH 10/14] test(stream): init lock. --- source/dnode/mnode/impl/test/stream/stream.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index fdc108cbe3..7dfb944d55 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -216,6 +216,7 @@ TEST_F(StreamTest, kill_checkpoint_trans) { pTask->id.streamId = defStreamId; pTask->id.taskId = 1; pTask->exec.qmsg = (char*)taosMemoryMalloc(1); + taosThreadMutexInit(&pTask->lock, NULL); taosArrayPush(pLevel, &pTask); From dbd3b4e2b92334bd86df363f7b3fd9c59b859340 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 10:02:25 +0800 Subject: [PATCH 11/14] test: remove print info. --- source/dnode/mnode/impl/test/stream/stream.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 7dfb944d55..8d106b1ede 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -223,21 +223,13 @@ TEST_F(StreamTest, kill_checkpoint_trans) { taosArrayPush(stream.tasks, &pLevel); mndCreateStreamResetStatusTrans(pMnode, &stream); - std::cout << "finish create reset status trans" << std::endl; - tFreeStreamObj(&stream); - - std::cout << "end1" << std::endl; sdbCleanup(pMnode->pSdb); - std::cout << "end2" << std::endl; - taosMemoryFree(pMnode); taosArrayDestroy(info.pUpdateNodeList); taosHashCleanup(info.pDBMap); - - std::cout << "end3" << std::endl; } TEST_F(StreamTest, plan_Test) { From 8c6ea4079c0c72918c8494d54f82f55a78454451 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 22 Feb 2024 14:15:35 +0800 Subject: [PATCH 12/14] delete invalid result --- .../executor/src/streamcountwindowoperator.c | 16 +++++++++++++--- source/libs/stream/src/streamSessionState.c | 3 +-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 080f9d4e2b..689ba54ca6 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -115,8 +115,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } } -static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, - SSHashObj* pStDeleted, bool* pRebuild) { +static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { + SSessionKey key = {0}; + getSessionHashKey(pKey, &key); + tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); + tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); +} + +static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, + int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated, + SSHashObj* pStDeleted, bool* pRebuild) { SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; int32_t num = 0; for (int32_t i = start; i < rows; i++) { @@ -148,6 +156,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI if (needDelState) { memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); + removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey); if (pWinInfo->winInfo.pStatePos->needFree) { pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey); } @@ -242,7 +251,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl setSessionWinOutputInfo(pStUpdated, &curWin.winInfo); slidingRows = *curWin.pWindowCount; if (!buffInfo.rebuildWindow) { - winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow); + winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, + pStDeleted, &buffInfo.rebuildWindow); } if (buffInfo.rebuildWindow) { SSessionKey range = {0}; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index bd28d2bca9..e2aae130e5 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -736,6 +736,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C void* pRockVal = NULL; SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); + streamStateFreeCur(pCur); if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); if (code == TSDB_CODE_SUCCESS) { @@ -743,7 +744,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); - streamStateFreeCur(pCur); goto _end; } } @@ -751,7 +751,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C pWinKey->win.ekey = endTs; (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); taosMemoryFree(pRockVal); - streamStateFreeCur(pCur); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); code = TSDB_CODE_FAILED; From 7c8eb21ee541cf73d632e2e97ecaf0b8e6122407 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 15:01:04 +0800 Subject: [PATCH 13/14] fix(stream): halt the correct task. --- source/dnode/mnode/impl/src/mndScheduler.c | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index cbc0ace75d..ef9a7205e1 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) { } } -static bool countWindowStreamTask(SSubplan* pPlan) { - SPhysiNode* pNode = pPlan->pNode; - return hasCountWindowNode(pNode); +static bool isCountWindowStreamTask(SSubplan* pPlan) { + return hasCountWindowNode((SPhysiNode*)pPlan->pNode); } int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, @@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe } } -static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) { - bool hasCountWindowNode = countWindowStreamTask(pPlan); - bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0); - if (hasCountWindowNode && isRelStreamTask) { +static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) { + bool hasCountWindowNode = isCountWindowStreamTask(pPlan); + + if (hasCountWindowNode && (!isFillhistoryTask)) { SStreamStatus* pStatus = &pTask->status; - mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set", - pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT)); + mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set", + pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus)); pStatus->taskStatus = TASK_STATUS__HALT; } } @@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) { static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { - // new stream task SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } + mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); - haltInitialTaskStatus(pTask, plan); + if (pStream->conf.fillHistory) { + haltInitialTaskStatus(pTask, plan, isFillhistory); + } streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); @@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre terrno = code; return terrno; } + return TDB_CODE_SUCCESS; } From da0265ddde86bdb0136ab5ba14ff1ee4a39f8e26 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Feb 2024 15:52:47 +0800 Subject: [PATCH 14/14] fix(tsdb): set correct error code info extract method. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 86f58717e2..90a26d17dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2543,7 +2543,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr while (1) { // only check here, since the iterate data in memory is very fast. if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } @@ -2694,7 +2694,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } @@ -2909,7 +2909,7 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_ while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } @@ -2951,7 +2951,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; }