From 498519c94dfa9a1c08fae38a6d8ead7a72684ace Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Sep 2023 11:45:46 +0800 Subject: [PATCH] fix(stream): remove invalid node. --- source/common/src/systable.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 4118dfd484..7107f0e058 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -159,7 +159,7 @@ static const SSysDbTableSchema streamSchema[] = { static const SSysDbTableSchema streamTaskSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "node_type", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7d7987b72b..30e0791294 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2229,6 +2229,23 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) { doRemoveFromTask(&execNodeList, pId); } + int32_t size = taosArrayGetSize(pNodeSnapshot); + SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry* pExisted = taosArrayGet(execNodeList.pNodeEntryList, i); + + for(int32_t j = 0; j < size; ++j) { + SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); + if (pEntry->nodeId == pExisted->nodeId) { + taosArrayPush(pValidNodeEntryList, pExisted); + break; + } + } + } + + execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); + execNodeList.pNodeEntryList = pValidNodeEntryList; + return 0; }