diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2c811495fd..e33af33d0e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1011,6 +1011,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) #define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) #define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108) +#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/mnode/impl/src/mndStreamErrorInjection.c b/source/dnode/mnode/impl/src/mndStreamErrorInjection.c new file mode 100644 index 0000000000..c68416369d --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamErrorInjection.c @@ -0,0 +1,72 @@ +#include "mndTrans.h" + +uint32_t seed = 0; + +static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + return rpcMsg; + } + + rpcMsg.info.traceId.rootId = traceId; + rpcMsg.info.notFreeAhandle = 1; + + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + return rpcMsg; +} + +void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) { + if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) || + (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) || + (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) { + if (seed == 0) { + seed = taosGetTimestampSec(); + } + + uint32_t v = taosRandR(&seed); + int32_t choseItem = v % 5; + + if (choseItem == 0) { + // 1. one of update-checkpoint not send, restart and send it again + taosMsleep(5000); + if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) { + mError( + "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will " + "rollback***"); + exit(-1); + } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT + mError( + "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will " + "not started***"); + } else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE + mError( + "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will " + "started after restart***"); + exit(-1); + } + } else if (choseItem == 1) { + // 2. repeat send update chkpt msg + mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***"); + + mError("***repeat 1***"); + SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature); + int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1); + + mError("***repeat 2***"); + SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature); + code = tmsgSendReq(&pAction->epSet, &rpcMsg2); + + mError("***repeat 3***"); + SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature); + code = tmsgSendReq(&pAction->epSet, &rpcMsg3); + } else if (choseItem == 2) { + // 3. sleep 40s and then send msg + mError("***idle for 30s, and then send msg***"); + taosMsleep(30000); + } else { + // do nothing + // mInfo("no error triggered"); + } + } +} diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 03b418f13f..0f903632ed 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -381,9 +381,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { - mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); + mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); - terrno = TSDB_CODE_INVALID_MSG; + // return directly and after the vnode to continue to send the next HbMsg. + terrno = TSDB_CODE_SUCCESS; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f9b7644af4..615c383f07 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1521,74 +1521,4 @@ int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { mError("snode not existed when trying to create stream in db with multiple replica"); return TSDB_CODE_SNODE_NOT_DEPLOYED; } -} - -uint32_t seed = 0; -static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; - rpcMsg.pCont = rpcMallocCont(pAction->contLen); - if (rpcMsg.pCont == NULL) { - return rpcMsg; - } - - rpcMsg.info.traceId.rootId = traceId; - rpcMsg.info.notFreeAhandle = 1; - - memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - return rpcMsg; -} - -void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) { - if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) || - (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) || - (pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) { - if (seed == 0) { - seed = taosGetTimestampSec(); - } - - uint32_t v = taosRandR(&seed); - int32_t choseItem = v % 5; - - if (choseItem == 0) { - // 1. one of update-checkpoint not send, restart and send it again - taosMsleep(5000); - if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) { - mError( - "***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will " - "rollback***"); - exit(-1); - } else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT - mError( - "***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will " - "not started***"); - } else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE - mError( - "***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will " - "started after restart***"); - exit(-1); - } - } else if (choseItem == 1) { - // 2. repeat send update chkpt msg - mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***"); - - mError("***repeat 1***"); - SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature); - int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1); - - mError("***repeat 2***"); - SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature); - code = tmsgSendReq(&pAction->epSet, &rpcMsg2); - - mError("***repeat 3***"); - SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature); - code = tmsgSendReq(&pAction->epSet, &rpcMsg3); - } else if (choseItem == 2) { - // 3. sleep 40s and then send msg - mError("***idle for 30s, and then send msg***"); - taosMsleep(30000); - } else { - // do nothing - // mInfo("no error triggered"); - } - } } \ No newline at end of file diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 8f741835e6..427733e9ec 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -244,6 +244,9 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger); int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id); +// inject stream errors +void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 999c855f49..cc92df368c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -222,14 +222,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check stError("s-task:%s vgId:%d current checkpointId:%" PRId64 " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (pActiveInfo->failedId >= checkpointId) { stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 " discard the checkpoint-trigger block", id, vgId, checkpointId, transId, pActiveInfo->failedId); - return code; + return TSDB_CODE_STREAM_INVLD_CHKPT; } if (pTask->chkInfo.checkpointId == checkpointId) { @@ -373,6 +373,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } +#if 0 + chkptFailedByRetrieveReqToSource(pTask, checkpointId); +#endif + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c new file mode 100644 index 0000000000..515845ba2b --- /dev/null +++ b/source/libs/stream/src/streamErrorInjection.c @@ -0,0 +1,17 @@ +#include "streamInt.h" + +/** + * pre-request: checkpoint interval should be 60s + * @param pTask + * @param checkpointId + */ +void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) { + streamMutexLock(&pTask->lock); + + // set current checkpoint failed immediately, set failed checkpoint id before clear the checkpoint info + streamTaskSetFailedCheckpointId(pTask, checkpointId); + streamMutexUnlock(&pTask->lock); + + // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode + taosMsleep(65*1000); +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0d8a85155a..00f72123dc 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -853,7 +853,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")