From 3fea717373a0cf0ec228cf1abab9651922fad68f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Jan 2024 19:28:49 +0800 Subject: [PATCH] fix(stream): discard the orphan tasks. --- source/client/test/clientTests.cpp | 24 +++++++++++----------- source/dnode/mnode/impl/src/mndStream.c | 27 +++++++++++++++---------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e78783cf3c..e6519a436e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -939,8 +939,8 @@ TEST(clientCase, agg_query_tables) { } taos_free_result(pRes); - int64_t st = 1685959293000; - for (int32_t i = 0; i < 10000000; ++i) { + int64_t st = 1685959293299; + for (int32_t i = 0; i < 5; ++i) { char s[256] = {0}; while (1) { @@ -954,16 +954,16 @@ TEST(clientCase, agg_query_tables) { } } - while (1) { - sprintf(s, "insert into t2 values(%ld, %d)", st + i, i); - pRes = taos_query(pConn, s); - int32_t ret = taos_errno(pRes); - - taos_free_result(pRes); - if (ret == 0) { - break; - } - } +// while (1) { +// sprintf(s, "insert into t2 values(%ld, %d)", st + i, i); +// pRes = taos_query(pConn, s); +// int32_t ret = taos_errno(pRes); +// +// taos_free_result(pRes); +// if (ret == 0) { +// break; +// } +// } } // pRes = taos_query(pConn, "show table distributed tup"); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b57ca009d3..146f9f6fc4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1642,10 +1642,18 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { +static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { SColumnInfoData *pColInfo; int32_t cols = 0; + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { + mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId); + return -1; + } + // stream name char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); @@ -1696,14 +1704,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat colDataSetVal(pColInfo, numOfRows, (const char *)level, false); // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); - if (pe == NULL) { - mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId); - return; - } + char status[20 + VARSTR_HEADER_SIZE] = {0}; const char *pStatus = streamTaskGetStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); @@ -1746,6 +1747,8 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + + return TSDB_CODE_SUCCESS; } static int32_t getNumOfTasks(SArray *pTaskList) { @@ -1787,8 +1790,10 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfLevels = taosArrayGetSize(pLevel); for (int32_t j = 0; j < numOfLevels; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); - numOfRows++; + int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); + if (code == TSDB_CODE_SUCCESS) { + numOfRows++; + } } }