refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-25 18:14:28 +08:00
parent 9baff82f85
commit c95cd8da8b
1 changed files with 14 additions and 10 deletions

View File

@ -2038,8 +2038,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
char buf[256] = {0};
EPSET_TO_STR(&pCurrent->epset, buf);
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s", pCurrent->nodeId, pPrevEp->fqdn,
pPrevEp->port, buf);
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
@ -2216,7 +2216,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
}
}
static int32_t doRemoveFromTask(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p != NULL) {
@ -2249,7 +2249,7 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) {
}
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId));
SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for(int32_t i = 0; i < numOfTask; ++i) {
@ -2258,16 +2258,16 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemoveTaskList, pId);
taosArrayPush(pRemovedTasks, pId);
}
}
for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) {
STaskId* pId = taosArrayGet(pRemoveTaskList, i);
doRemoveFromTask(&execInfo, pId);
for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId* pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList),
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t) taosArrayGetSize(execInfo.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot);
@ -2287,7 +2287,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
execInfo.pNodeEntryList = pValidNodeEntryList;
taosArrayDestroy(pRemoveTaskList);
mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks);
return 0;
}
@ -2336,6 +2337,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
taosArrayDestroy(execInfo.pNodeEntryList);
execInfo.pNodeEntryList = pNodeSnapshot;
execInfo.ts = ts;
} else {
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
taosArrayDestroy(pNodeSnapshot);
}
} else {
mDebug("no update found in nodeList");