fix(stream): remove invalid node.

This commit is contained in:
Haojun Liao 2023-09-22 11:45:46 +08:00
parent 27e00b1f19
commit 498519c94d
2 changed files with 18 additions and 1 deletions

View File

@ -159,7 +159,7 @@ static const SSysDbTableSchema streamSchema[] = {
static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -2229,6 +2229,23 @@ static int32_t removeInvalidStreamTask(SArray *pNodeSnapshot) {
doRemoveFromTask(&execNodeList, pId);
}
int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pExisted = taosArrayGet(execNodeList.pNodeEntryList, i);
for(int32_t j = 0; j < size; ++j) {
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == pExisted->nodeId) {
taosArrayPush(pValidNodeEntryList, pExisted);
break;
}
}
}
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pValidNodeEntryList;
return 0;
}