Merge pull request #29959 from taosdata/fix/droptask

refactor(stream): create task epset update trans out of lock
This commit is contained in:
Simon Guan 2025-02-28 11:00:58 +08:00 committed by GitHub
commit aa1bf309ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 87 additions and 56 deletions

View File

@ -1901,15 +1901,16 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes, STrans** pUpdateTrans) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
STrans *pTrans = NULL; STrans *pTrans = NULL;
int32_t code = 0; int32_t code = 0;
*pUpdateTrans = NULL;
// conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
while (1) { while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
@ -1926,6 +1927,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
} }
while (1) { while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
@ -1946,7 +1948,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
if (p1 == NULL && p2 == NULL) { if (p1 == NULL && p2 == NULL) {
mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); mDebug("stream:0x%" PRIx64 " %s not involved in nodeUpdate, ignore", pStream->uid, pStream->name);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
continue; continue;
} }
@ -1981,20 +1983,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
} }
// no need to build the trans to handle the vgroup update // no need to build the trans to handle the vgroup update
if (pTrans == NULL) { *pUpdateTrans = pTrans;
return 0;
}
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code; return code;
} }
@ -2076,7 +2065,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
taosHashCleanup(pHash); taosHashCleanup(pHash);
mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList)); mDebug("numOfvNodes:%d get after extracting nodeInfo from all streams", (int32_t)taosArrayGetSize(pNodeList));
return code; return code;
} }
@ -2100,14 +2089,49 @@ static void addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
} }
} }
static int32_t doProcessNodeCheckHelp(SArray *pNodeSnapshot, SMnode *pMnode, SVgroupChangeInfo *pChangeInfo,
bool *pUpdateAllVgroups) {
int32_t code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
if (code) {
mDebug("failed to remove expired node entry in buf, code:%s", tstrerror(code));
return code;
}
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, pChangeInfo);
if (code) {
mDebug("failed to find changed vnode(s) during vnode(s) check, code:%s", tstrerror(code));
return code;
}
{
if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
*pUpdateAllVgroups = true;
execInfo.switchFromFollower = false; // reset the flag
addAllDbsIntoHashmap(pChangeInfo->pDBMap, pMnode->pSdb);
}
}
if (taosArrayGetSize(pChangeInfo->pUpdateNodeList) > 0 || (*pUpdateAllVgroups)) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
killAllCheckpointTrans(pMnode, pChangeInfo);
} else {
mDebug("no update found in vnode(s) list");
}
return code;
}
// this function runs by only one thread, so it is not multi-thread safe // this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
bool allReady = true; bool allReady = true;
SArray *pNodeSnapshot = NULL; SArray *pNodeSnapshot = NULL;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
int64_t ts = taosGetTimestampSec(); int64_t tsms = taosGetTimestampMs();
bool updateAllVgroups = false; int64_t ts = tsms / 1000;
bool updateAllVgroups = false;
SVgroupChangeInfo changeInfo = {0};
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) { if (old != 0) {
@ -2115,7 +2139,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0; return 0;
} }
mDebug("start to do node changing check"); mDebug("start to do node changing check, ts:%" PRId64, tsms);
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
int32_t numOfNodes = extractStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
@ -2141,58 +2165,60 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
} }
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
code = doProcessNodeCheckHelp(pNodeSnapshot, pMnode, &changeInfo, &updateAllVgroups);
streamMutexUnlock(&execInfo.lock);
code = removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
if (code) { if (code) {
goto _end; goto _end;
} }
SVgroupChangeInfo changeInfo = {0};
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
if (code) {
goto _end;
}
{
if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
updateAllVgroups = true;
execInfo.switchFromFollower = false; // reset the flag
addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
}
}
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) { if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
// kill current active checkpoint transaction, since the transaction is vnode wide. mDebug("vnode(s) change detected, build trans to update stream task epsets");
killAllCheckpointTrans(pMnode, &changeInfo);
code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups); STrans *pTrans = NULL;
streamMutexLock(&execInfo.lock);
code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups, &pTrans);
streamMutexUnlock(&execInfo.lock);
// NOTE: sync trans out of lock
if (code == 0 && pTrans != NULL) {
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
}
mndTransDrop(pTrans);
}
// keep the new vnode snapshot if success // keep the new vnode snapshot if success
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
streamMutexLock(&execInfo.lock);
code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList); code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
int32_t num = (int)taosArrayGetSize(execInfo.pNodeList);
if (code == 0) {
execInfo.ts = ts;
mDebug("create trans successfully, update cached node list, numOfNodes:%d", num);
}
streamMutexUnlock(&execInfo.lock);
if (code) { if (code) {
mError("failed to extract node list from stream, code:%s", tstrerror(code)); mError("failed to extract node list from stream, code:%s", tstrerror(code));
goto _end; goto _end;
} }
execInfo.ts = ts;
mDebug("create trans successfully, update cached node list, numOfNodes:%d",
(int)taosArrayGetSize(execInfo.pNodeList));
} else {
mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
} }
} else {
mDebug("no update found in nodeList");
} }
mndDestroyVgroupChangeInfo(&changeInfo); mndDestroyVgroupChangeInfo(&changeInfo);
_end: _end:
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
mDebug("end to do stream task node change checking"); mDebug("end to do stream task node change checking, elapsed time:%" PRId64 "ms", taosGetTimestampMs() - tsms);
atomic_store_32(&mndNodeCheckSentinel, 0); atomic_store_32(&mndNodeCheckSentinel, 0);
return 0; return 0;
} }

View File

@ -910,9 +910,12 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int64_t startTs = pTask->chkInfo.startTs; int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.pActiveInfo->activeId; int64_t ckId = pTask->chkInfo.pActiveInfo->activeId;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
streamMutexLock(&pTask->lock);
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
streamMutexUnlock(&pTask->lock);
// sink task does not need to save the status, and generated the checkpoint // sink task does not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);

View File

@ -698,6 +698,8 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) {
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
streamMutexUnlock(&pTask->lock);
if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
@ -715,7 +717,7 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) {
} }
} }
streamMutexUnlock(&pTask->lock); // streamMutexUnlock(&pTask->lock);
return code; return code;
} }