fix(stream): record the failed checkpointId, after receive reset task info.
This commit is contained in:
parent
b2b5a14d9d
commit
187997d5f0
|
@ -1011,6 +1011,7 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
|
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
|
||||||
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
|
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
|
||||||
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
|
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
|
||||||
|
#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
uint32_t seed = 0;
|
||||||
|
|
||||||
|
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
||||||
|
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
|
||||||
|
if (rpcMsg.pCont == NULL) {
|
||||||
|
return rpcMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcMsg.info.traceId.rootId = traceId;
|
||||||
|
rpcMsg.info.notFreeAhandle = 1;
|
||||||
|
|
||||||
|
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||||
|
return rpcMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
|
||||||
|
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
|
||||||
|
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
|
||||||
|
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
|
||||||
|
if (seed == 0) {
|
||||||
|
seed = taosGetTimestampSec();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t v = taosRandR(&seed);
|
||||||
|
int32_t choseItem = v % 5;
|
||||||
|
|
||||||
|
if (choseItem == 0) {
|
||||||
|
// 1. one of update-checkpoint not send, restart and send it again
|
||||||
|
taosMsleep(5000);
|
||||||
|
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
|
||||||
|
mError(
|
||||||
|
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
|
||||||
|
"rollback***");
|
||||||
|
exit(-1);
|
||||||
|
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
|
||||||
|
mError(
|
||||||
|
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
|
||||||
|
"not started***");
|
||||||
|
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
|
||||||
|
mError(
|
||||||
|
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
|
||||||
|
"started after restart***");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
} else if (choseItem == 1) {
|
||||||
|
// 2. repeat send update chkpt msg
|
||||||
|
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
|
||||||
|
|
||||||
|
mError("***repeat 1***");
|
||||||
|
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||||
|
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
|
||||||
|
|
||||||
|
mError("***repeat 2***");
|
||||||
|
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||||
|
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
|
||||||
|
|
||||||
|
mError("***repeat 3***");
|
||||||
|
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||||
|
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
|
||||||
|
} else if (choseItem == 2) {
|
||||||
|
// 3. sleep 40s and then send msg
|
||||||
|
mError("***idle for 30s, and then send msg***");
|
||||||
|
taosMsleep(30000);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
// mInfo("no error triggered");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -381,9 +381,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
||||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
|
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
|
||||||
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
// return directly and after the vnode to continue to send the next HbMsg.
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||||
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
|
|
@ -1522,73 +1522,3 @@ int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
||||||
return TSDB_CODE_SNODE_NOT_DEPLOYED;
|
return TSDB_CODE_SNODE_NOT_DEPLOYED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t seed = 0;
|
|
||||||
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
|
||||||
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
|
||||||
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
|
|
||||||
if (rpcMsg.pCont == NULL) {
|
|
||||||
return rpcMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcMsg.info.traceId.rootId = traceId;
|
|
||||||
rpcMsg.info.notFreeAhandle = 1;
|
|
||||||
|
|
||||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
|
||||||
return rpcMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
|
|
||||||
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
|
|
||||||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
|
|
||||||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
|
|
||||||
if (seed == 0) {
|
|
||||||
seed = taosGetTimestampSec();
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t v = taosRandR(&seed);
|
|
||||||
int32_t choseItem = v % 5;
|
|
||||||
|
|
||||||
if (choseItem == 0) {
|
|
||||||
// 1. one of update-checkpoint not send, restart and send it again
|
|
||||||
taosMsleep(5000);
|
|
||||||
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
|
|
||||||
mError(
|
|
||||||
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
|
|
||||||
"rollback***");
|
|
||||||
exit(-1);
|
|
||||||
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
|
|
||||||
mError(
|
|
||||||
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
|
|
||||||
"not started***");
|
|
||||||
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
|
|
||||||
mError(
|
|
||||||
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
|
|
||||||
"started after restart***");
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
} else if (choseItem == 1) {
|
|
||||||
// 2. repeat send update chkpt msg
|
|
||||||
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
|
|
||||||
|
|
||||||
mError("***repeat 1***");
|
|
||||||
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
|
||||||
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
|
|
||||||
|
|
||||||
mError("***repeat 2***");
|
|
||||||
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
|
||||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
|
|
||||||
|
|
||||||
mError("***repeat 3***");
|
|
||||||
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
|
||||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
|
|
||||||
} else if (choseItem == 2) {
|
|
||||||
// 3. sleep 40s and then send msg
|
|
||||||
mError("***idle for 30s, and then send msg***");
|
|
||||||
taosMsleep(30000);
|
|
||||||
} else {
|
|
||||||
// do nothing
|
|
||||||
// mInfo("no error triggered");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -244,6 +244,9 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger);
|
||||||
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
|
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
|
||||||
STimeWindow* pLatestWindow, const char* id);
|
STimeWindow* pLatestWindow, const char* id);
|
||||||
|
|
||||||
|
// inject stream errors
|
||||||
|
void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -222,14 +222,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
||||||
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
|
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
|
||||||
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
||||||
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
||||||
return code;
|
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pActiveInfo->failedId >= checkpointId) {
|
if (pActiveInfo->failedId >= checkpointId) {
|
||||||
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
|
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
|
||||||
" discard the checkpoint-trigger block",
|
" discard the checkpoint-trigger block",
|
||||||
id, vgId, checkpointId, transId, pActiveInfo->failedId);
|
id, vgId, checkpointId, transId, pActiveInfo->failedId);
|
||||||
return code;
|
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->chkInfo.checkpointId == checkpointId) {
|
if (pTask->chkInfo.checkpointId == checkpointId) {
|
||||||
|
@ -373,6 +373,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
chkptFailedByRetrieveReqToSource(pTask, checkpointId);
|
||||||
|
#endif
|
||||||
|
|
||||||
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||||
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
|
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
#include "streamInt.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* pre-request: checkpoint interval should be 60s
|
||||||
|
* @param pTask
|
||||||
|
* @param checkpointId
|
||||||
|
*/
|
||||||
|
void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) {
|
||||||
|
streamMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
// set current checkpoint failed immediately, set failed checkpoint id before clear the checkpoint info
|
||||||
|
streamTaskSetFailedCheckpointId(pTask, checkpointId);
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
// the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode
|
||||||
|
taosMsleep(65*1000);
|
||||||
|
}
|
|
@ -854,6 +854,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict eve
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||||
|
|
Loading…
Reference in New Issue