fix(stream): discard the orphan tasks.

This commit is contained in:
Haojun Liao 2024-01-03 19:28:49 +08:00
parent b06ccc0ca7
commit 3fea717373
2 changed files with 28 additions and 23 deletions

View File

@ -939,8 +939,8 @@ TEST(clientCase, agg_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
int64_t st = 1685959293000; int64_t st = 1685959293299;
for (int32_t i = 0; i < 10000000; ++i) { for (int32_t i = 0; i < 5; ++i) {
char s[256] = {0}; char s[256] = {0};
while (1) { while (1) {
@ -954,16 +954,16 @@ TEST(clientCase, agg_query_tables) {
} }
} }
while (1) { // while (1) {
sprintf(s, "insert into t2 values(%ld, %d)", st + i, i); // sprintf(s, "insert into t2 values(%ld, %d)", st + i, i);
pRes = taos_query(pConn, s); // pRes = taos_query(pConn, s);
int32_t ret = taos_errno(pRes); // int32_t ret = taos_errno(pRes);
//
taos_free_result(pRes); // taos_free_result(pRes);
if (ret == 0) { // if (ret == 0) {
break; // break;
} // }
} // }
} }
// pRes = taos_query(pConn, "show table distributed tup"); // pRes = taos_query(pConn, "show table distributed tup");

View File

@ -1642,10 +1642,18 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) {
SColumnInfoData *pColInfo; SColumnInfoData *pColInfo;
int32_t cols = 0; int32_t cols = 0;
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
return -1;
}
// stream name // stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
@ -1696,14 +1704,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat
colDataSetVal(pColInfo, numOfRows, (const char *)level, false); colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
return;
}
const char *pStatus = streamTaskGetStatusStr(pe->status); const char *pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus); STR_TO_VARSTR(status, pStatus);
@ -1746,6 +1747,8 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
return TSDB_CODE_SUCCESS;
} }
static int32_t getNumOfTasks(SArray *pTaskList) { static int32_t getNumOfTasks(SArray *pTaskList) {
@ -1787,8 +1790,10 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfLevels = taosArrayGetSize(pLevel); int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) { for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j); SStreamTask *pTask = taosArrayGetP(pLevel, j);
setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
numOfRows++; if (code == TSDB_CODE_SUCCESS) {
numOfRows++;
}
} }
} }