diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 0a93848843..bd0d97e34d 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -113,38 +113,39 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode, int32_t acceptCode); -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); +int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, + const char *pMsg, STrans **pTrans1); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); -SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); -int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); -int32_t mndProcessStreamHb(SRpcMsg *pReq); -void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -int32_t extractStreamNodeList(SMnode *pMnode); -int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); -int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); -int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); -int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); -int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); -int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, - int64_t ts); -void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); +int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); +int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); +int32_t mndProcessStreamHb(SRpcMsg *pReq); +void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +int32_t extractStreamNodeList(SMnode *pMnode); +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); +int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); +int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, + int64_t ts); +void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); -SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); -void destroyStreamTaskIter(SStreamTaskIter *pIter); -bool streamTaskIterNextTask(SStreamTaskIter *pIter); -SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); -void mndInitExecInfo(); -void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); -int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); -void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); +void destroyStreamTaskIter(SStreamTaskIter *pIter); +bool streamTaskIterNextTask(SStreamTaskIter *pIter); +int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); +int32_t mndInitExecInfo(); +void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fa4b9b64f4..6362fbaa79 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -134,17 +134,18 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); - mndInitExecInfo(); - - if (sdbSetTable(pMnode->pSdb, table) != 0) { - return -1; + int32_t code = mndInitExecInfo(); + if (code) { + return code; } - if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) { - return -1; + code = sdbSetTable(pMnode->pSdb, table); + if (code) { + return terrno; } - return 0; + code = sdbSetTable(pMnode->pSdb, tableSeq); + return code; } void mndCleanupStream(SMnode *pMnode) { @@ -252,6 +253,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream } int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) { + terrno = 0; + SSdb *pSdb = pMnode->pSdb; (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName); if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { @@ -530,9 +533,21 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { } int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = createStreamTaskIter(pStream); + SStreamTaskIter *pIter = NULL; + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + mError("failed to create task iter for stream:%s", pStream->name); + return code; + } + while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + destroyStreamTaskIter(pIter); + return code; + } + if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { destroyStreamTaskIter(pIter); return -1; @@ -727,7 +742,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } code = mndAcquireStream(pMnode, createReq.name, &pStream); - if (pStream != NULL || code != 0) { + if (pStream != NULL || code == 0) { if (createReq.igExists) { mInfo("stream:%s, already exist, ignore exist is set", createReq.name); goto _OVER; @@ -760,8 +775,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg); - if (pTrans == NULL) { + STrans *pTrans = NULL; + code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans); + if (pTrans == NULL || code) { goto _OVER; } @@ -802,11 +818,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // add into buffer firstly // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name); saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); -// mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); // execute creation if (mndTransPrepare(pMnode, pTrans) != 0) { @@ -867,7 +882,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { { // check the max checkpoint id from all vnodes. int64_t maxCheckpointId = -1; if (lock) { - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); } for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { @@ -888,7 +903,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { } if (lock) { - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); } if (maxCheckpointId > maxChkptId) { @@ -989,11 +1004,13 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, - "gen checkpoint for stream"); - if (pTrans == NULL) { + STrans *pTrans = NULL; + code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, + "gen checkpoint for stream", &pTrans); + if (pTrans == NULL || code) { + code = TSDB_CODE_MND_TRANS_CONFLICT; mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, - tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + tstrerror(code)); goto _ERR; } @@ -1033,7 +1050,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre taosWUnLockLatch(&pStream->lock); if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) { - return code; + goto _ERR; } if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { @@ -1057,13 +1074,13 @@ int32_t extractStreamNodeList(SMnode *pMnode) { static bool taskNodeIsUpdated(SMnode *pMnode) { // check if the node update happens or not - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); int32_t numOfNodes = extractStreamNodeList(pMnode); if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = taosGetTimestampSec(); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return false; } @@ -1071,7 +1088,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return true; } } @@ -1086,7 +1103,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return true; } @@ -1102,7 +1119,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { mDebug("stream tasks not ready due to node update"); } - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return nodeUpdated; } @@ -1112,7 +1129,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { return -1; } - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); if (taosArrayGetSize(execInfo.pNodeList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); @@ -1157,7 +1174,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { removeTasksInBuf(pInvalidList, &execInfo); taosArrayDestroy(pInvalidList); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return ready ? 0 : -1; } @@ -1220,14 +1237,14 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { continue; } - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) { - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); sdbRelease(pSdb, pStream); continue; } - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; taosArrayPush(pList, &in); @@ -1270,8 +1287,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { for (int32_t i = 0; i < numOfQual; ++i) { SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i); - SStreamObj *p = mndGetStreamObj(pMnode, pCheckpointInfo->streamId); - if (p != NULL) { + SStreamObj *p = NULL; + code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p); + if (p != NULL || code != 0) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); @@ -1362,8 +1380,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); - if (pTrans == NULL) { + STrans *pTrans = NULL; + code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); + if (pTrans == NULL || code) { mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr()); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); @@ -1863,9 +1882,9 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfRows = 0; SStreamObj *pStream = NULL; - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); mndInitStreamExecInfo(pMnode, &execInfo); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); @@ -1882,11 +1901,24 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } // add row for each task - SStreamTaskIter *pIter = createStreamTaskIter(pStream); - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + SStreamTaskIter *pIter = NULL; + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + taosRUnLockLatch(&pStream->lock); + sdbRelease(pSdb, pStream); + mError("failed to create task iter for stream:%s", pStream->name); + continue; + } - int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + destroyStreamTaskIter(pIter); + break; + } + + code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); if (code == TSDB_CODE_SUCCESS) { numOfRows++; } @@ -1961,7 +1993,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { { // check for tasks, if tasks are not ready, not allowed to pause bool found = false; bool readyToPause = true; - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); @@ -1984,7 +2016,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { found = true; } - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); if (!found) { mError("stream:%s task not report status yet, not ready for pause", pauseReq.name); sdbRelease(pMnode->pSdb, pStream); @@ -1998,42 +2030,49 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } } - STrans *pTrans = - doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); - if (pTrans == NULL) { + STrans *pTrans = NULL; + code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans); + if (pTrans == NULL || code) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); - return -1; + return code; } code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); + if (code) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } // if nodeUpdate happened, not send pause trans - if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) { + code = mndStreamSetPauseAction(pMnode, pTrans, pStream); + if (code) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } // pause stream taosWLockLatch(&pStream->lock); pStream->status = STREAM_STATUS__PAUSE; - if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) { + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code) { taosWUnLockLatch(&pStream->lock); - sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } taosWUnLockLatch(&pStream->lock); - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } sdbRelease(pMnode->pSdb, pStream); @@ -2087,22 +2126,28 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = - doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream"); - if (pTrans == NULL) { + STrans *pTrans = NULL; + code = + doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans); + if (pTrans == NULL || code) { mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); - return -1; + return code; } code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); + if (code) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } // set the resume action if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } // resume stream @@ -2113,7 +2158,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } taosWUnLockLatch(&pStream->lock); @@ -2121,7 +2166,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } sdbRelease(pMnode->pSdb, pStream); @@ -2195,6 +2240,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange SStreamObj *pStream = NULL; void *pIter = NULL; STrans *pTrans = NULL; + int32_t code = 0; // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool while (1) { @@ -2221,12 +2267,11 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // here create only one trans if (pTrans == NULL) { - pTrans = - doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); - if (pTrans == NULL) { + code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets", &pTrans); + if (pTrans == NULL || code) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); - return terrno; + return terrno = code; } mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); @@ -2243,7 +2288,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); - int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); + code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again if (code != TSDB_CODE_SUCCESS) { @@ -2258,7 +2303,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange if (code != TSDB_CODE_SUCCESS) { sdbCancelFetch(pSdb, pIter); - return -1; + return code; } } @@ -2267,16 +2312,17 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return 0; + return code; } static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) { @@ -2293,9 +2339,21 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) { taosWLockLatch(&pStream->lock); - SStreamTaskIter *pTaskIter = createStreamTaskIter(pStream); + SStreamTaskIter *pTaskIter = NULL; + int32_t code = createStreamTaskIter(pStream, &pTaskIter); + if (code) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pSdb, pStream); + mError("failed to create task iter for stream:%s", pStream->name); + continue; + } + while (streamTaskIterNextTask(pTaskIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pTaskIter); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pTaskIter, &pTask); + if (code) { + break; + } SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; epsetAssign(&entry.epset, &pTask->info.epSet); @@ -2342,9 +2400,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); int32_t numOfNodes = extractStreamNodeList(pMnode); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); if (numOfNodes == 0) { mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing"); @@ -2368,7 +2426,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot); @@ -2392,7 +2450,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } taosArrayDestroy(pNodeSnapshot); - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -2418,9 +2476,19 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { } void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - SStreamTaskIter *pIter = createStreamTaskIter(pStream); + SStreamTaskIter *pIter = NULL; + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + mError("failed to create task iter for stream:%s", pStream->name); + return; + } + while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + break; + } STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); @@ -2490,10 +2558,11 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId); // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); - SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - if (pStream == NULL) { + SStreamObj *pStream = NULL; + int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream); + if (pStream == NULL || code != 0) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); @@ -2504,7 +2573,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { if (p == NULL) { mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return -1; } else { mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", @@ -2549,7 +2618,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); } - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); { SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; @@ -2609,10 +2678,11 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); - SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - if (pStream == NULL) { + SStreamObj *pStream = NULL; + int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream); + if (pStream == NULL || code != 0) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer @@ -2622,7 +2692,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { if (p == NULL) { mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); return -1; } else { mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", @@ -2654,7 +2724,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); } - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS); return 0; @@ -2719,7 +2789,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, // req.nodeId, req.streamId, req.taskId, req.checkpointId); // // // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. -// taosThreadMutexLock(&execInfo.lock); +// streamMutexLock(&execInfo.lock); // // // mnode handle the create stream transaction too slow may cause this problem // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); @@ -2733,7 +2803,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, // if (p == NULL) { // mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); // terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; -// taosThreadMutexUnlock(&execInfo.lock); +// streamMutexUnlock(&execInfo.lock); // // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // return -1; @@ -2749,7 +2819,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, // // int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); // if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly -// taosThreadMutexUnlock(&execInfo.lock); +// streamMutexUnlock(&execInfo.lock); // mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs); // // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); @@ -2766,7 +2836,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, // SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); // mndAddConsensusTasks(pInfo, &req); // -// taosThreadMutexUnlock(&execInfo.lock); +// streamMutexUnlock(&execInfo.lock); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // return 0; // } @@ -2776,7 +2846,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, // req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); // mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs); // -// taosThreadMutexUnlock(&execInfo.lock); +// streamMutexUnlock(&execInfo.lock); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // return 0; // } @@ -2789,7 +2859,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, // mndReleaseStream(pMnode, pStream); // } // -// taosThreadMutexUnlock(&execInfo.lock); +// streamMutexUnlock(&execInfo.lock); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // return 0; //} @@ -2816,7 +2886,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { return 0; } - taosThreadMutexLock(&execInfo.lock); + streamMutexLock(&execInfo.lock); void *pIter = NULL; while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { @@ -2826,8 +2896,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { int32_t num = taosArrayGetSize(pInfo->pTaskList); SArray *pList = taosArrayInit(4, sizeof(int32_t)); - SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId); - if (pStream == NULL) { // stream has been dropped already + SStreamObj *pStream = NULL; + code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); + if (pStream == NULL || code != 0) { // stream has been dropped already mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); taosArrayDestroy(pList); continue; @@ -2886,14 +2957,14 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i); - mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId); + code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId); } - taosThreadMutexUnlock(&execInfo.lock); + streamMutexUnlock(&execInfo.lock); taosArrayDestroy(pStreamList); mDebug("end to process consensus-checkpointId in tmr"); - return TSDB_CODE_SUCCESS; + return code; } static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { @@ -2944,32 +3015,41 @@ void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { } int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) { - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, - "update checkpoint-info"); - if (pTrans == NULL) { - return terrno; + STrans *pTrans = NULL; + int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, + "update checkpoint-info", &pTrans); + if (pTrans == NULL || code) { + sdbRelease(pMnode->pSdb, pStream); + return code; } - /*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); - int32_t code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream); - if (code != 0) { + code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); + if (code){ + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream); + if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); - if (code != TSDB_CODE_SUCCESS) { + if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code) { mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } sdbRelease(pMnode->pSdb, pStream); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 5f0434e3d0..c5297b5ba8 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -61,15 +61,23 @@ void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { } int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, - " reset from failed checkpoint"); - if (pTrans == NULL) { + STrans *pTrans = NULL; + int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, + " reset from failed checkpoint", &pTrans); + if (pTrans == NULL || code) { + sdbRelease(pMnode->pSdb, pStream); return terrno; } - /*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); - int32_t code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); - if (code != 0) { + code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); + if (code) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); + if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -79,14 +87,15 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != 0) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } sdbRelease(pMnode->pSdb, pStream); @@ -99,8 +108,9 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t t int32_t code = TSDB_CODE_SUCCESS; mndKillTransImpl(pMnode, transId, ""); - SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); - if (pStream == NULL) { + SStreamObj *pStream = NULL; + code = mndGetStreamObj(pMnode, streamId, &pStream); + if (pStream == NULL || code != 0) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); } else { @@ -159,34 +169,39 @@ int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; - STrans *pTrans = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); - if (pTrans == NULL) { + STrans *pTrans = NULL; + int32_t code = + doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); + if (pTrans == NULL || code != 0) { mError("failed to create trans to drop orphan tasks since %s", terrstr()); - return -1; + return code; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); - + code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); + if (code) { + return code; + } // drop all tasks - if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) { + if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, pList)) < 0) { mError("failed to create trans to drop orphan tasks since %s", terrstr()); mndTransDrop(pTrans); - return -1; + return code; } // drop stream - if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) { + if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) { mndTransDrop(pTrans); - return -1; + return code; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - return -1; + return code; } + mndTransDrop(pTrans); - return 0; + return code; } int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) { @@ -230,9 +245,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray *pOrphanTasks = NULL; int32_t code = 0; - if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { + if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) { - return -1; + return code; } } @@ -242,8 +257,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (tDecodeStreamHbMsg(&decoder, &req) < 0) { tCleanupStreamHbMsg(&req); tDecoderClear(&decoder); - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = terrno = TSDB_CODE_INVALID_MSG; + return code; } tDecoderClear(&decoder); @@ -258,12 +273,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); - terrno = TSDB_CODE_INVALID_MSG; + code = terrno = TSDB_CODE_INVALID_MSG; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); taosThreadMutexUnlock(&execInfo.lock); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); - return -1; + return code; } int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); @@ -294,9 +309,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { .startTs = pChkInfo->consensusTs, }; - SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId); - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + SStreamObj *pStream = NULL; + code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); + if (code) { + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + continue; + } + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); SCheckpointConsensusInfo *pInfo = NULL; code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo); @@ -350,7 +370,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pMnode != NULL) { SArray *p = NULL; - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); + code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); taosArrayDestroy(p); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); @@ -388,7 +408,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); - return TSDB_CODE_SUCCESS; + return terrno; } void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index f252791618..b189ddb3cb 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -153,27 +153,30 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { return 0; } -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, - const char *pMsg) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name); - if (pTrans == NULL) { +int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, + const char *pMsg, STrans ** pTrans1) { + *pTrans1 = NULL; + terrno = 0; + + STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name); + if (p == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno; } - mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); + mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName); + if (mndTransCheckConflict(pMnode, p) != 0) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno)); - mndTransDrop(pTrans); - return NULL; + mndTransDrop(p); + return terrno; } - terrno = 0; - return pTrans; + *pTrans1 = p; + return 0; } SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { @@ -272,8 +275,9 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { continue; } - SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId); - if (pStream != NULL) { + SStreamObj *pStream = NULL; + int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream); + if (pStream != NULL || code != 0) { if (identicalName(pStream->sourceDb, pDBName, len)) { mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb); } else if (identicalName(pStream->targetDb, pDBName, len)) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 3a84856ae0..0b96626536 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -28,20 +28,20 @@ struct SStreamTaskIter { int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); -SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) { - SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); - if (pIter == NULL) { +int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) { + *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); + if (*pIter == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return terrno; } - pIter->level = -1; - pIter->ordinalIndex = 0; - pIter->pStream = pStream; - pIter->totalLevel = taosArrayGetSize(pStream->tasks); - pIter->pTask = NULL; + (*pIter)->level = -1; + (*pIter)->ordinalIndex = 0; + (*pIter)->pStream = pStream; + (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks); + (*pIter)->pTask = NULL; - return pIter; + return 0; } bool streamTaskIterNextTask(SStreamTaskIter* pIter) { @@ -72,8 +72,15 @@ bool streamTaskIterNextTask(SStreamTaskIter* pIter) { return false; } -SStreamTask* streamTaskIterGetCurrent(SStreamTaskIter* pIter) { - return pIter->pTask; +int32_t streamTaskIterGetCurrent(SStreamTaskIter* pIter, SStreamTask** pTask) { + if (pTask) { + *pTask = pIter->pTask; + if (*pTask != NULL) { + return TSDB_CODE_SUCCESS; + } + } + + return TSDB_CODE_INVALID_PARA; } void destroyStreamTaskIter(SStreamTaskIter* pIter) { @@ -132,10 +139,15 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { } char buf[256] = {0}; - epsetToStr(&entry.epset, buf, tListLen(buf)); + (void) epsetToStr(&entry.epset, buf, tListLen(buf)); + + void* p = taosArrayPush(pVgroupList, &entry); + if (p == NULL) { + mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); + } else { + mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); + } - mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupList, &entry); sdbRelease(pSdb, pVgroup); } @@ -146,15 +158,23 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { break; } - SNodeEntry entry = {0}; - addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); - entry.nodeId = SNODE_HANDLE; + SNodeEntry entry = {.nodeId = SNODE_HANDLE}; + code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); + if (code) { + sdbRelease(pSdb, pObj); + continue; + } char buf[256] = {0}; - epsetToStr(&entry.epset, buf, tListLen(buf)); - mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + (void) epsetToStr(&entry.epset, buf, tListLen(buf)); + + void* p = taosArrayPush(pVgroupList, &entry); + if (p == NULL) { + mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); + } else { + mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + } - taosArrayPush(pVgroupList, &entry); sdbRelease(pSdb, pObj); } @@ -162,28 +182,33 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { return code; } -SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { - void *pIter = NULL; - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; +int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) { + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + *pStream = NULL; - while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { - if (pStream->uid == streamId) { + SStreamObj *p = NULL; + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) { + if (p->uid == streamId) { sdbCancelFetch(pSdb, pIter); - return pStream; + *pStream = p; + return TSDB_CODE_SUCCESS; } - sdbRelease(pSdb, pStream); + sdbRelease(pSdb, p); } - return NULL; + return TSDB_CODE_STREAM_TASK_NOT_EXIST; } void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) { STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("kill active transId:%d in Db:%s", transId, pDbName); - mndKillTrans(pMnode, pTrans); + int32_t code = mndKillTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); + if (code) { + mError("failed to kill trans:%d", pTrans->id); + } } else { mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId); } @@ -199,11 +224,16 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); if (pIter != NULL) { - addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); + int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); sdbRelease(pMnode->pSdb, pObj); sdbCancelFetch(pMnode->pSdb, pIter); - *hasEpset = true; - return TSDB_CODE_SUCCESS; + if (code) { + *hasEpset = false; + mError("failed to set epset"); + } else { + *hasEpset = true; + } + return code; } else { mError("failed to acquire snode epset"); return TSDB_CODE_INVALID_PARA; @@ -225,12 +255,14 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t } static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { + terrno = 0; + SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } pReq->head.vgId = htonl(pTask->info.nodeId); @@ -244,31 +276,45 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { terrno = code; taosMemoryFree(pReq); - return -1; + return terrno; } code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); - return -1; + return terrno; } mDebug("set the resume action for trans:%d", pTrans->id); return 0; } -SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { - SStreamTaskIter *pIter = createStreamTaskIter(pStream); +int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) { + *pTask = NULL; + + SStreamTask *p = NULL; + SStreamTaskIter *pIter = NULL; + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + mError("failed to create stream task iter:%s", pStream->name); + return code; + } + while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - if (pTask->id.taskId == pId->taskId) { + code = streamTaskIterGetCurrent(pIter, &p); + if (code) { + continue; + } + + if (p->id.taskId == pId->taskId) { destroyStreamTaskIter(pIter); - return pTask; + *pTask = p; + return 0; } } destroyStreamTaskIter(pIter); - return NULL; + return TSDB_CODE_FAILED; } int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { @@ -282,13 +328,25 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { } int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { - SStreamTaskIter *pIter = createStreamTaskIter(pStream); + SStreamTaskIter *pIter = NULL; + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + mError("failed to create stream task iter:%s", pStream->name); + return code; + } while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code || pTask == NULL) { destroyStreamTaskIter(pIter); - return -1; + return code; + } + + code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated); + if (code) { + destroyStreamTaskIter(pIter); + return code; } if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { @@ -305,7 +363,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } pReq->head.vgId = htonl(pTask->info.nodeId); @@ -322,25 +380,38 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } char buf[256] = {0}; - epsetToStr(&epset, buf, tListLen(buf)); + (void) epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); - return -1; + return code; } return 0; } int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = createStreamTaskIter(pStream); + SStreamTaskIter *pIter = NULL; + + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + mError("failed to create stream task iter:%s", pStream->name); + return code; + } while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { destroyStreamTaskIter(pIter); - return -1; + return code; + } + + code = doSetPauseAction(pMnode, pTrans, pTask); + if (code) { + destroyStreamTaskIter(pIter); + return code; } if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { @@ -350,14 +421,14 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr } destroyStreamTaskIter(pIter); - return 0; + return code; } static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } pReq->head.vgId = htonl(pTask->info.nodeId); @@ -368,28 +439,40 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction - terrno = code; - return -1; + return code; } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); - return -1; + return code; } return 0; } int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = createStreamTaskIter(pStream); + SStreamTaskIter *pIter = NULL; + + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + mError("failed to create stream task iter:%s", pStream->name); + return code; + } while(streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - if (doSetDropAction(pMnode, pTrans, pTask) < 0) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { destroyStreamTaskIter(pIter); - return -1; + return code; + } + + code = doSetDropAction(pMnode, pTrans, pTask); + if (code) { + destroyStreamTaskIter(pIter); + return code; } } destroyStreamTaskIter(pIter); @@ -400,7 +483,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno; } pReq->head.vgId = htonl(pTask->nodeId); @@ -411,16 +494,15 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId); if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction - terrno = code; taosMemoryFree(pReq); - return -1; + return code; } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); - return -1; + return code; } return 0; @@ -429,19 +511,35 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) { for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SOrphanTask* pTask = taosArrayGet(pList, i); - mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId); - doSetDropActionFromId(pMnode, pTrans, pTask); + int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask); + if (code != 0) { + return code; + } else { + mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId); + } } return 0; } static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, int32_t transId) { + int32_t code = 0; + pMsg->streamId = pId->streamId; pMsg->taskId = pId->taskId; pMsg->transId = transId; pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); - taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); + if (pMsg->pNodeList == NULL) { + mError("failed to prepare node list, code:out of memory"); + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (code == 0) { + void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); + if (p == NULL) { + mError("failed to add update node list into nodeList"); + } + } } static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, @@ -456,7 +554,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(req.pNodeList); - return -1; + return terrno; } int32_t tlen = sizeof(SMsgHead) + blen; @@ -465,13 +563,18 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(req.pNodeList); - return -1; + return terrno; } void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - tEncodeStreamTaskUpdateMsg(&encoder, &req); + code = tEncodeStreamTaskUpdateMsg(&encoder, &req); + if (code == -1) { + tEncoderClear(&encoder); + taosArrayDestroy(req.pNodeList); + return code; + } SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); @@ -489,15 +592,20 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { void *pBuf = NULL; int32_t len = 0; - streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + int32_t code = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + if (code) { + return code; + } - doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + if (code) { + return code; + } SEpSet epset = {0}; bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || !hasEpset) { - terrno = code; return code; } @@ -512,16 +620,30 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask // build trans to update the epset int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); - taosWLockLatch(&pStream->lock); + SStreamTaskIter *pIter = NULL; + + taosWLockLatch(&pStream->lock); + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + taosWUnLockLatch(&pStream->lock); + mError("failed to create stream task iter:%s", pStream->name); + return code; + } - SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return code; + } + + code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo); if (code != TSDB_CODE_SUCCESS) { destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); - return -1; + return code; } } @@ -560,16 +682,30 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - taosWLockLatch(&pStream->lock); + SStreamTaskIter *pIter = NULL; + + taosWLockLatch(&pStream->lock); + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + taosWUnLockLatch(&pStream->lock); + mError("failed to create stream task iter:%s", pStream->name); + return code; + } - SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - int32_t code = doSetResetAction(pMnode, pTrans, pTask); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return code; + } + + code = doSetResetAction(pMnode, pTrans, pTask); if (code != TSDB_CODE_SUCCESS) { destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); - return -1; + return code; } } @@ -583,8 +719,12 @@ static void freeTaskList(void* param) { taosArrayDestroy(*pList); } -void mndInitExecInfo() { - taosThreadMutexInit(&execInfo.lock, NULL); +int32_t mndInitExecInfo() { + int32_t code = taosThreadMutexInit(&execInfo.lock, NULL); + if (code) { + return code; + } + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); @@ -598,6 +738,7 @@ void mndInitExecInfo() { taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList); + return 0; } void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { @@ -610,7 +751,10 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { for (int32_t j = 0; j < size; ++j) { SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); if (pEntry->nodeId == p->nodeId) { - taosArrayPush(pValidList, p); + void* px = taosArrayPush(pValidList, p); + if (px == NULL) { + mError("failed to put node into list, nodeId:%d", p->nodeId); + } break; } } @@ -628,7 +772,10 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { return TSDB_CODE_SUCCESS; } - taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + if (code) { + return code; + } for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); @@ -647,28 +794,45 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { STaskId *pId = taosArrayGet(pTaskIds, i); - doRemoveTasks(pExecInfo, pId); + int32_t code = doRemoveTasks(pExecInfo, pId); + if (code) { + mError("failed to remove task in buffer list, 0x%"PRIx64, pId->taskId); + } } } void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - taosThreadMutexLock(&pExecNode->lock); + SStreamTaskIter *pIter = NULL; + streamMutexLock(&pExecNode->lock); // 1. remove task entries - SStreamTaskIter *pIter = createStreamTaskIter(pStream); + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + streamMutexUnlock(&pExecNode->lock); + mError("failed to create stream task iter:%s", pStream->name); + return; + } + while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + continue; + } STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - doRemoveTasks(pExecNode, &id); + code = doRemoveTasks(pExecNode, &id); + if (code) { + mError("failed to remove task in buffer list, 0x%"PRIx64, id.taskId); + } } ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); // 2. remove stream entry in consensus hash table - mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); + (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); - taosThreadMutexUnlock(&pExecNode->lock); + streamMutexUnlock(&pExecNode->lock); destroyStreamTaskIter(pIter); } @@ -699,7 +863,10 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { - taosArrayPush(pRemovedTasks, pId); + void* p = taosArrayPush(pRemovedTasks, pId); + if (p == NULL) { + mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId); + } } } @@ -761,45 +928,64 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas } int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SStreamTaskIter *pIter = NULL; + taosWLockLatch(&pStream->lock); + int32_t code = createStreamTaskIter(pStream, &pIter); + if (code) { + taosWUnLockLatch(&pStream->lock); + mError("failed to create stream task iter:%s", pStream->name); + return code; + } - SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + if (code) { + destroyStreamTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return code; + } - int32_t code = doSetUpdateChkptAction(pMnode, pTrans, pTask); + code = doSetUpdateChkptAction(pMnode, pTrans, pTask); if (code != TSDB_CODE_SUCCESS) { destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); - return -1; + return code; } } destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); - return 0; + return code; } int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); + int32_t code = 0; mDebug("start to scan checkpoint report info"); while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { SArray *pList = *(SArray **)pIter; - STaskChkptInfo* pInfo = taosArrayGet(pList, 0); - SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); - if (pStream == NULL) { + STaskChkptInfo *pInfo = taosArrayGet(pList, 0); + SStreamObj *pStream = NULL; + code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); + if (pStream == NULL || code != 0) { mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId); - taosArrayPush(pDropped, &pInfo->streamId); + void* p = taosArrayPush(pDropped, &pInfo->streamId); + if (p == NULL) { + mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId); + } + continue; } int32_t total = mndGetNumOfStreamTasks(pStream); - int32_t existed = (int32_t) taosArrayGetSize(pList); + int32_t existed = (int32_t)taosArrayGetSize(pList); if (total == existed) { mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", @@ -807,17 +993,21 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); if (!conflict) { - int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); - if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry - taosArrayPush(pDropped, &pInfo->streamId); - mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); + code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry + void* p = taosArrayPush(pDropped, &pInfo->streamId); + if (p == NULL) { + mError("failed to remove stream:0x%" PRIx64, pInfo->streamId); + } else { + mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); + } } else { mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", pInfo->streamId); } break; } else { - mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); + mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId); } } else { mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, @@ -831,7 +1021,10 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { if (size > 0) { for (int32_t i = 0; i < size; ++i) { int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); - taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); + code = taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); + if (code) { + mError("failed to remove stream in buf:0x%"PRIx64, streamId); + } } int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); @@ -856,29 +1049,30 @@ static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStream int32_t blen; tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code); if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } int32_t tlen = sizeof(SMsgHead) + blen; void *pBuf = taosMemoryMalloc(tlen); if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - tEncodeRestoreCheckpointInfo(&encoder, &req); + code = tEncodeRestoreCheckpointInfo(&encoder, &req); + tEncoderClear(&encoder); + if (code == -1) { + taosMemoryFree(pBuf); + return code; + } SMsgHead *pMsgHead = (SMsgHead *)pBuf; pMsgHead->contLen = htonl(tlen); pMsgHead->vgId = htonl(pTask->info.nodeId); - tEncoderClear(&encoder); - SEpSet epset = {0}; bool hasEpset = false; code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); @@ -900,17 +1094,28 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i char msg[128] = {0}; snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg); - if (pTrans == NULL) { + STrans *pTrans = NULL; + int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans); + if (pTrans == NULL || code != 0) { return terrno; } STaskId id = {.streamId = pStream->uid, .taskId = taskId}; - SStreamTask *pTask = mndGetStreamTask(&id, pStream); - ASSERT(pTask); + SStreamTask *pTask = NULL; + code = mndGetStreamTask(&id, pStream, &pTask); + if (code) { + mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name); + sdbRelease(pMnode->pSdb, pStream); + return code; + } - /*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid); - int32_t code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts); + code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid); + if (code) { + sdbRelease(pMnode->pSdb, pStream); + return code; + } + + code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts); if (code != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -918,17 +1123,18 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i } code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); - if (code != TSDB_CODE_SUCCESS) { + if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code) { mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return -1; + return code; } sdbRelease(pMnode->pSdb, pStream); @@ -938,8 +1144,11 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i } int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) { - *pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); - if (pInfo != NULL) { + *pInfo = NULL; + + void* px = taosHashGet(pHash, &streamId, sizeof(streamId)); + if (px != NULL) { + *pInfo = px; return 0; } @@ -977,11 +1186,15 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo } } - taosArrayPush(pInfo->pTaskList, &info); - int32_t num = taosArrayGetSize(pInfo->pTaskList); - mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 - " waiting tasks:%d", - pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); + void *p = taosArrayPush(pInfo->pTaskList, &info); + if (p == NULL) { + mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId); + } else { + int32_t num = taosArrayGetSize(pInfo->pTaskList); + mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 + " waiting tasks:%d", + pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); + } } void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { @@ -990,22 +1203,14 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { } int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { - taosHashRemove(pHash, &streamId, sizeof(streamId)); - int32_t numOfStreams = taosHashGetSize(pHash); - mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", streamId, - numOfStreams); - return TSDB_CODE_SUCCESS; -} + int32_t code = taosHashRemove(pHash, &streamId, sizeof(streamId)); + if (code == 0) { + int32_t numOfStreams = taosHashGetSize(pHash); + mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", + streamId, numOfStreams); + } else { + mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list", streamId); + } -//int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { -// void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); -// ASSERT(pInfo == NULL); -// -// SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL}; -// taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); -// -// SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId)); -// ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0); -// mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId); -// return TSDB_CODE_SUCCESS; -//} \ No newline at end of file + return code; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 30ffcd71ac..1283f8e20b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -639,7 +639,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { code = tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - if (code) { + if (code == -1) { stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code)); return TSDB_CODE_INVALID_MSG; } diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 884d7ea1b6..d8656c0f60 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci } int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, const char type, int8_t lvl) { - size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); + size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); if (len > inputSize) { output[0] = 0; memcpy(output + 1, input, inputSize); @@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output, int32_t outputSize, const char type) { if (input[0] == 1) { - return FL2_decompress(output, outputSize, input + 1, compressedSize - 1); + return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1); } else if (input[0] == 0) { memcpy(output, input + 1, compressedSize - 1); return compressedSize - 1;