fix(stream): remove expired stream in buf.

This commit is contained in:
Haojun Liao 2024-06-06 23:54:48 +08:00
parent 6ef3c02717
commit 88e77d6bc6
5 changed files with 88 additions and 62 deletions

View File

@ -107,7 +107,7 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq); int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode); int32_t extractStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
@ -122,6 +122,9 @@ SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo(); void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot); void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
void removeInvalidTasks(SArray* pTaskIds);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -62,7 +62,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot); static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -1031,7 +1031,7 @@ _ERR:
return code; return code;
} }
int32_t initStreamNodeList(SMnode *pMnode) { int32_t extractStreamNodeList(SMnode *pMnode) {
if (taosArrayGetSize(execInfo.pNodeList) == 0) { if (taosArrayGetSize(execInfo.pNodeList) == 0) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode); execInfo.pNodeList = extractNodeListFromStream(pMnode);
@ -1044,7 +1044,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
// check if the node update happens or not // check if the node update happens or not
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) { if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec(); execInfo.ts = taosGetTimestampSec();
@ -2212,27 +2212,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
return plist; return plist;
} }
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray *pList, int32_t nodeId) { static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList); size_t num = taosArrayGetSize(pList);
@ -2246,7 +2225,7 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) {
return false; return false;
} }
int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
@ -2264,10 +2243,7 @@ int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) {
} }
} }
for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { removeInvalidTasks(pRemovedTasks);
STaskId *pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList)); (int32_t)taosArrayGetSize(execInfo.pTaskList));
@ -2294,7 +2270,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) { if (numOfNodes == 0) {
@ -2314,7 +2290,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
} }
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
removeExpiredNodeEntryAndTask(pNodeSnapshot);
removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
@ -2410,21 +2387,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); doRemoveTasks(pExecNode, &id);
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
}
} }
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));

View File

@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo {
int32_t transId; int32_t transId;
} SFailedCheckpointInfo; } SFailedCheckpointInfo;
static void doExtractTasksFromStream(SMnode *pMnode) { static void extractStreamTasks(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
@ -38,6 +38,33 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
} }
} }
static void removeDroppedStreams(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SArray* pInvalid = taosArrayInit(4, sizeof(STaskId));
for(int32_t i = 0; i < num; ++i) {
STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i);
void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t));
if (p != NULL) {
continue;
}
void* pObj = mndGetStreamObj(pMnode, pId->streamId);
if (pObj != NULL) {
mndReleaseStream(pMnode, pObj);
taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0);
} else {
taosArrayPush(pInvalid, pId);
}
}
removeInvalidTasks(pInvalid);
taosHashCleanup(pHash);
}
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) { for (int32_t j = 0; j < numOfNodes; ++j) {
@ -230,7 +257,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
SArray *pFailedTasks = NULL; SArray *pFailedChkpt = NULL;
SArray *pOrphanTasks = NULL; SArray *pOrphanTasks = NULL;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
@ -252,17 +279,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
// extract stream task list // extract stream task list
if (taosHashGetSize(execInfo.pTaskMap) == 0) { if (taosHashGetSize(execInfo.pTaskMap) == 0) {
doExtractTasksFromStream(pMnode); extractStreamTasks(pMnode);
} else {
// the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty.
// let's drop them here.
removeDroppedStreams(pMnode, &execInfo);
} }
initStreamNodeList(pMnode); extractStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) { if (numOfUpdated > 0) {
@ -310,7 +341,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SFailedCheckpointInfo info = { SFailedCheckpointInfo info = {
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedTasks, &info); addIntoCheckpointList(pFailedChkpt, &info);
} }
} }
@ -328,7 +359,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans // current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal // kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedTasks) > 0) { if (taosArrayGetSize(pFailedChkpt) > 0) {
bool allReady = true; bool allReady = true;
if (pMnode != NULL) { if (pMnode != NULL) {
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
@ -339,8 +370,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (allReady || snodeChanged) { if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId); pInfo->checkpointId, pInfo->transId);
@ -359,7 +390,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req); tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedTasks); taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks); taosArrayDestroy(pOrphanTasks);
{ {

View File

@ -597,4 +597,33 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
execInfo.pNodeList = pValidList; execInfo.pNodeList = pValidList;
mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList)); mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList));
}
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (p == NULL) {
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
break;
}
}
return TSDB_CODE_SUCCESS;
}
void removeInvalidTasks(SArray *pTaskIds) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
STaskId *pId = taosArrayGet(pTaskIds, i);
doRemoveTasks(&execInfo, pId);
}
} }

View File

@ -279,10 +279,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
// we need to transfer state here. The transfer of state may generate new data that need to dispatch to downstream, // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
// to transfer the new data to downstream before checkpoint-trigger reaching the downstream tasks. // The transfer of state may generate new data that need to dispatch to downstream tasks,
// Otherwise, those new generated data may be lost, if crash before next checkpoint data generatd, which the // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
// the new generated data is kept in outputQ, and failed to dispatch to downstream tasks. // before the next checkpoint.
{ {
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) { if (dropRelHTask) {