diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 9289909b19..dade7b4bdf 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -110,7 +110,7 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); + int32_t retryCode, int32_t acceptCode); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 136839451f..c9598c4b38 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -515,7 +515,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); + int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0); if (code != 0) { taosMemoryFree(buf); return -1; @@ -959,7 +959,7 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask return -1; } - code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0); if (code != 0) { taosMemoryFree(buf); } @@ -2554,27 +2554,28 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs - mInfo("stream:0x%" PRIx64 - " %s all %d tasks send checkpoint-report, start to update checkpoint meta-info for checkpointId:%" PRId64, + mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 + " will be issued soon", req.streamId, pStream->name, total, req.checkpointId); - if (pStream != NULL) { - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); - if (conflict) { - mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); - } else { - int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); - if (code == TSDB_CODE_SUCCESS) { // remove this entry - taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); - - int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, numOfStreams); - } else { - mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", - req.streamId); - } - } - } + // if (pStream != NULL) { + // bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + // if (conflict) { + // mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); + // } else { + // int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); + // if (code == TSDB_CODE_SUCCESS) { // remove this entry + // taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); + // + // int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); + // mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, + // numOfStreams); + // } else { + // mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", + // req.streamId); + // } + // } + // } } if (pStream != NULL) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 8137081631..0daa383d3e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -127,7 +127,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p return false; } -int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { +int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamId) { taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { @@ -136,12 +136,13 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { } mndStreamClearFinishedTrans(pMnode, NULL); - SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); + SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; taosThreadMutexUnlock(&execInfo.lock); - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { + if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 || + strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) { return tInfo.transId; } } else { @@ -246,8 +247,9 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) } int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { - STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; + int32_t retryCode, int32_t acceptCode) { + STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode, + .acceptableCode = acceptCode}; return mndTransAppendRedoAction(pTrans, &action); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 6745acac34..a0731833e6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -230,7 +230,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT return -1; } - code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -308,7 +308,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa epsetToStr(&epset, buf, tListLen(buf)); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); - code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -356,7 +356,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -400,7 +400,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); if (code != 0) { taosMemoryFree(pReq); return -1; @@ -484,7 +484,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask return code; } - code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID); + code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID, 0); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pBuf); } @@ -534,7 +534,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa return code; } - code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); } @@ -727,7 +727,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas return code; } - code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0); + code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pReq); } diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index b8eb54e10e..36d364382a 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -86,8 +86,9 @@ static void taosDeleteTimer(void *tharg) { static TdThread timerThread; static timer_t timerId; static volatile bool stopTimer = false; -static void *taosProcessAlarmSignal(void *tharg) { - // Block the signal + +static void *taosProcessAlarmSignal(void *tharg) { + // Block the signal sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGALRM);