refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-25 18:14:28 +08:00
parent ec5bdff5fa
commit 38eb5d2f89
1 changed files with 14 additions and 10 deletions

View File

@ -2038,8 +2038,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
char buf[256] = {0}; char buf[256] = {0};
EPSET_TO_STR(&pCurrent->epset, buf); EPSET_TO_STR(&pCurrent->epset, buf);
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s", pCurrent->nodeId, pPrevEp->fqdn, mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->port, buf); pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId}; SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
@ -2216,7 +2216,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
} }
} }
static int32_t doRemoveFromTask(SStreamExecInfo* pExecNode, STaskId* pRemovedId) { static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p != NULL) { if (p != NULL) {
@ -2249,7 +2249,7 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) {
} }
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId)); SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for(int32_t i = 0; i < numOfTask; ++i) { for(int32_t i = 0; i < numOfTask; ++i) {
@ -2258,16 +2258,16 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) { if (!existed) {
taosArrayPush(pRemoveTaskList, pId); taosArrayPush(pRemovedTasks, pId);
} }
} }
for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId* pId = taosArrayGet(pRemoveTaskList, i); STaskId* pId = taosArrayGet(pRemovedTasks, i);
doRemoveFromTask(&execInfo, pId); doRemoveTasks(&execInfo, pId);
} }
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList), mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t) taosArrayGetSize(execInfo.pTaskList)); (int32_t) taosArrayGetSize(execInfo.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot); int32_t size = taosArrayGetSize(pNodeSnapshot);
@ -2287,7 +2287,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
execInfo.pNodeEntryList = pValidNodeEntryList; execInfo.pNodeEntryList = pValidNodeEntryList;
taosArrayDestroy(pRemoveTaskList); mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks);
return 0; return 0;
} }
@ -2336,6 +2337,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
taosArrayDestroy(execInfo.pNodeEntryList); taosArrayDestroy(execInfo.pNodeEntryList);
execInfo.pNodeEntryList = pNodeSnapshot; execInfo.pNodeEntryList = pNodeSnapshot;
execInfo.ts = ts; execInfo.ts = ts;
} else {
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
taosArrayDestroy(pNodeSnapshot);
} }
} else { } else {
mDebug("no update found in nodeList"); mDebug("no update found in nodeList");