fix(stream): init the node list and task list when starting mnode.

This commit is contained in:
Haojun Liao 2024-06-07 17:53:53 +08:00
parent c4447d6873
commit d0675a660e
5 changed files with 75 additions and 108 deletions

View File

@ -120,8 +120,9 @@ void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo);
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
#ifdef __cplusplus
}

View File

@ -680,6 +680,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL;
}
mndInitStreamExecInfo(pMnode, &execInfo);
mInfo("mnode open successfully");
return pMnode;
}

View File

@ -56,13 +56,14 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
static void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -131,9 +132,11 @@ int32_t mndInitStream(SMnode *pMnode) {
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
}
if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) {
return -1;
}
return 0;
}
@ -2169,7 +2172,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0;
}
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -2215,48 +2218,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SNodeEntry *pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
}
return false;
}
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->nodeId == SNODE_HANDLE) {
continue;
}
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemovedTasks, pId);
}
}
removeTasksInBuf(pRemovedTasks, &execInfo);
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
removeExpiredNodeInfo(pNodeSnapshot);
taosArrayDestroy(pRemovedTasks);
return 0;
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
@ -2510,3 +2471,24 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
}
return code;
}
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
addAllStreamTasksIntoBuf(pMnode, pExecInfo);
extractNodeListFromStream(pMnode, pExecInfo->pNodeList);
}
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream);
}
}

View File

@ -22,55 +22,6 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream);
}
}
static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
if (pMnode == NULL) {
return;
}
int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SArray *pIdList = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < num; ++i) {
STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i);
void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t));
if (p != NULL) {
continue;
}
void* pObj = mndGetStreamObj(pMnode, pId->streamId);
if (pObj != NULL) {
mndReleaseStream(pMnode, pObj);
taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0);
} else {
taosArrayPush(pIdList, pId);
}
}
removeTasksInBuf(pIdList, &execInfo);
taosArrayDestroy(pIdList);
taosHashCleanup(pHash);
}
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
@ -290,17 +241,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
addAllStreamTasksIntoBuf(pMnode, &execInfo);
} else {
// the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty.
// let's drop them here.
removeDroppedStreamTasksInBuf(pMnode, &execInfo);
}
extractStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);

View File

@ -645,4 +645,46 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SNodeEntry *pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
}
return false;
}
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->nodeId == SNODE_HANDLE) {
continue;
}
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemovedTasks, pId);
}
}
removeTasksInBuf(pRemovedTasks, &execInfo);
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
removeExpiredNodeInfo(pNodeSnapshot);
taosArrayDestroy(pRemovedTasks);
return 0;
}