Merge pull request #27695 from taosdata/fix/create_tb
refactor: add random error generator for stream trans.
This commit is contained in:
commit
0728c93cd0
|
@ -1859,3 +1859,73 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
|
|||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
uint32_t seed = 0;
|
||||
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
||||
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
||||
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
|
||||
if (rpcMsg.pCont == NULL) {
|
||||
return rpcMsg;
|
||||
}
|
||||
|
||||
rpcMsg.info.traceId.rootId = traceId;
|
||||
rpcMsg.info.notFreeAhandle = 1;
|
||||
|
||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||
return rpcMsg;
|
||||
}
|
||||
|
||||
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
|
||||
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
|
||||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
|
||||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
|
||||
if (seed == 0) {
|
||||
seed = taosGetTimestampSec();
|
||||
}
|
||||
|
||||
uint32_t v = taosRandR(&seed);
|
||||
int32_t choseItem = v % 5;
|
||||
|
||||
if (choseItem == 0) {
|
||||
// 1. one of update-checkpoint not send, restart and send it again
|
||||
taosMsleep(5000);
|
||||
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
|
||||
"rollback***");
|
||||
exit(-1);
|
||||
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
|
||||
"not started***");
|
||||
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
|
||||
mError(
|
||||
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
|
||||
"started after restart***");
|
||||
exit(-1);
|
||||
}
|
||||
} else if (choseItem == 1) {
|
||||
// 2. repeat send update chkpt msg
|
||||
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
|
||||
|
||||
mError("***repeat 1***");
|
||||
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
|
||||
|
||||
mError("***repeat 2***");
|
||||
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
|
||||
|
||||
mError("***repeat 3***");
|
||||
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
|
||||
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
|
||||
} else if (choseItem == 2) {
|
||||
// 3. sleep 40s and then send msg
|
||||
mError("***idle for 30s, and then send msg***");
|
||||
taosMsleep(30000);
|
||||
} else {
|
||||
// do nothing
|
||||
// mInfo("no error triggered");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue