fix(stream): accept invalid vgroup id error code when handling the update checkpoint info.

This commit is contained in:
Haojun Liao 2024-06-12 10:11:55 +08:00
parent 80eb88d6ac
commit dc2497791a
5 changed files with 40 additions and 36 deletions

View File

@ -110,7 +110,7 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode); int32_t retryCode, int32_t acceptCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);

View File

@ -515,7 +515,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
tEncodeStreamTask(&encoder, pTask); tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder); tEncoderClear(&encoder);
int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
return -1; return -1;
@ -959,7 +959,7 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return -1; return -1;
} }
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
} }
@ -2554,27 +2554,28 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
int32_t total = taosArrayGetSize(*pReqTaskList); int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs if (total == numOfTasks) { // all tasks has send the reqs
mInfo("stream:0x%" PRIx64 mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
" %s all %d tasks send checkpoint-report, start to update checkpoint meta-info for checkpointId:%" PRId64, " will be issued soon",
req.streamId, pStream->name, total, req.checkpointId); req.streamId, pStream->name, total, req.checkpointId);
if (pStream != NULL) { // if (pStream != NULL) {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); // bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
if (conflict) { // if (conflict) {
mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId); // mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId);
} else { // } else {
int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList); // int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList);
if (code == TSDB_CODE_SUCCESS) { // remove this entry // if (code == TSDB_CODE_SUCCESS) { // remove this entry
taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); // taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
//
int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); // int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId, numOfStreams); // mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId,
} else { // numOfStreams);
mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet", // } else {
req.streamId); // mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet",
} // req.streamId);
} // }
} // }
// }
} }
if (pStream != NULL) { if (pStream != NULL) {

View File

@ -127,7 +127,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
return false; return false;
} }
int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamId) {
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) { if (num <= 0) {
@ -136,12 +136,13 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
} }
mndStreamClearFinishedTrans(pMnode, NULL); mndStreamClearFinishedTrans(pMnode, NULL);
SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) { if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry; SStreamTransInfo tInfo = *pEntry;
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0 ||
strcmp(tInfo.name, MND_STREAM_CHKPT_UPDATE_NAME) == 0) {
return tInfo.transId; return tInfo.transId;
} }
} else { } else {
@ -246,8 +247,9 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
} }
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode) { int32_t retryCode, int32_t acceptCode) {
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode,
.acceptableCode = acceptCode};
return mndTransAppendRedoAction(pTrans, &action); return mndTransAppendRedoAction(pTrans, &action);
} }

View File

@ -230,7 +230,7 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
return -1; return -1;
} }
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -308,7 +308,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
epsetToStr(&epset, buf, tListLen(buf)); epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -356,7 +356,7 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -400,7 +400,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -484,7 +484,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return code; return code;
} }
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID); code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }
@ -534,7 +534,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
return code; return code;
} }
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
@ -727,7 +727,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
return code; return code;
} }
code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0); code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }

View File

@ -86,6 +86,7 @@ static void taosDeleteTimer(void *tharg) {
static TdThread timerThread; static TdThread timerThread;
static timer_t timerId; static timer_t timerId;
static volatile bool stopTimer = false; static volatile bool stopTimer = false;
static void *taosProcessAlarmSignal(void *tharg) { static void *taosProcessAlarmSignal(void *tharg) {
// Block the signal // Block the signal
sigset_t sigset; sigset_t sigset;