diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 0ad3023cbe..da9dcf1c04 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -33,5 +33,6 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t startStreamTasks(SStreamMeta* pMeta); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); +int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 6de29f8513..444739e461 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -90,7 +90,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 62840f7e1f..b8ecf66686 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -685,6 +685,40 @@ _OVER: return -1; } +static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool* hasEpset, int32_t taskId, int32_t nodeId) { + *hasEpset = false; + + if (nodeId == SNODE_HANDLE) { + SSnodeObj *pObj = NULL; + void *pIter = NULL; + + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter != NULL) { + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + *hasEpset = true; + return TSDB_CODE_SUCCESS; + } else { + mError("failed to acquire snode epset"); + return TSDB_CODE_INVALID_PARA; + } + } else { + SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId); + if (pVgObj != NULL) { + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + epsetAssign(pEpSet, &epset); + *hasEpset = true; + return TSDB_CODE_SUCCESS; + } else { + mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId); + return TSDB_CODE_SUCCESS; + } + } +} + static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { @@ -698,28 +732,17 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask STransAction action = {0}; SEpSet epset = {0}; - if (pTask->info.nodeId == SNODE_HANDLE) { - SSnodeObj *pObj = NULL; - void *pIter = NULL; - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); - if (pIter == NULL) { - break; - } + bool hasEpset = false; - addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); - sdbRelease(pMnode->pSdb, pObj); - } - } else { - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - if (pVgObj != NULL) { - epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); - } else { - mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId); - taosMemoryFree(pReq); - return 0; - } + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + // no valid epset, return directly without redoAction + if (!hasEpset) { + return TSDB_CODE_SUCCESS; } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. @@ -1776,9 +1799,20 @@ static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *p pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + SEpSet epset; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + taosMemoryFree(pReq); + return -1; + } + + // no valid epset, return directly without redoAction + if (!hasEpset) { + taosMemoryFree(pReq); + return TSDB_CODE_SUCCESS; + } STransAction action = {0}; initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); @@ -1920,17 +1954,25 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { static int32_t mndResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { + mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pReq->head.vgId = htonl(pTask->info.nodeId); pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; pReq->igUntreated = igUntreated; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + SEpSet epset; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + taosMemoryFree(pReq); + return -1; + } STransAction action = {0}; initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); @@ -2208,6 +2250,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP epsetAssign(&updateInfo.newEp, &pCurrent->epset); taosArrayPush(info.pUpdateNodeList, &updateInfo); } + + // todo handle the snode info if (pCurrent->nodeId != SNODE_HANDLE) { SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); @@ -2704,9 +2748,18 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + SEpSet epset; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); + continue; + } + + if (!hasEpset) { + taosMemoryFree(pReq); + continue; + } STransAction action = {0}; initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); @@ -2878,11 +2931,17 @@ static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *p mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + SEpSet epset; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (hasEpset) { + tmsgSendReq(&epset, &msg); + } - tmsgSendReq(&epset, &msg); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 31d923169a..f0fe662025 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -205,6 +205,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); + case TDMT_VND_STREAM_TASK_RESET: + return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg); default: ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 76049d21b1..c33becf5b9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1336,26 +1336,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { - SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont; - - SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask == NULL) { - tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, pReq->taskId); - return TSDB_CODE_SUCCESS; - } - - tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); - - // clear flag set during do checkpoint, and open inputQ for all upstream tasks - if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) { - streamTaskClearCheckInfo(pTask, true); - streamTaskSetStatusReady(pTask); - } - - streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; + return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 011ce674ec..0d932e6e72 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -832,5 +832,27 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { tqDebug("vgId:%d start all tasks completed", pMeta->vgId); } + return TSDB_CODE_SUCCESS; +} + +int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont; + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, pReq->taskId); + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); + + // clear flag set during do checkpoint, and open inputQ for all upstream tasks + if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) { + streamTaskClearCheckInfo(pTask, true); + streamTaskSetStatusReady(pTask); + } + + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } \ No newline at end of file