fix(stream):handle the case of delete orphaned tasks.

This commit is contained in:
Haojun Liao 2023-12-13 17:44:00 +08:00
parent 88a8e4e9f0
commit 105542bb21
2 changed files with 14 additions and 8 deletions

View File

@ -704,10 +704,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
STransAction action = {0}; STransAction action = {0};
SEpSet epset = {0}; SEpSet epset = {0};
if(pTask->info.nodeId == SNODE_HANDLE){ if (pTask->info.nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL; SSnodeObj *pObj = NULL;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) { if (pIter == NULL) {
@ -717,10 +717,16 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj); sdbRelease(pMnode->pSdb, pObj);
} }
}else{ } else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
epset = mndGetVgroupEpset(pMnode, pVgObj); if (pVgObj != NULL) {
mndReleaseVgroup(pMnode, pVgObj); epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId);
taosMemoryFree(pReq);
return 0;
}
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
@ -1657,6 +1663,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) { if (pe == NULL) {
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
return; return;
} }

View File

@ -48,8 +48,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
void* pKey = taosHashGetKey(pEntry, &keyLen); void* pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name // key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name,
pEntry->startTime); pEntry->startTime);
taosArrayPush(pList, &info); taosArrayPush(pList, &info);
} else { } else {