Merge pull request #27510 from taosdata/fix/TD-31542-remove-assert-mnode

fix/TD-31542-remove-assert-mnode
This commit is contained in:
Hongze Cheng 2024-08-29 11:13:23 +08:00 committed by GitHub
commit 22fa18842d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 38 additions and 9 deletions

View File

@ -634,7 +634,10 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
int32_t code = 0; int32_t code = 0;
size_t size = taosArrayGetSize(tasks); size_t size = taosArrayGetSize(tasks);
ASSERT(size >= 2); if (size < 2) {
mError("task list size is less than 2");
return;
}
SArray* pDownTaskList = taosArrayGetP(tasks, size - 1); SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);

View File

@ -1208,7 +1208,11 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) { if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); if (taosArrayGetSize(execInfo.pTaskList) != 0) {
streamMutexUnlock(&execInfo.lock);
mError("stream task node change checking done, no vgroups exist, but task list is not empty");
return TSDB_CODE_FAILED;
}
} }
SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
@ -2788,7 +2792,13 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
if (((now - pe->ts) >= 10 * 1000) || allSame) { if (((now - pe->ts) >= 10 * 1000) || allSame) {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId, mDebug("s-task:0x%x sendTs:%" PRId64 " wait %.2fs and all tasks have same checkpointId", pe->req.taskId,
pe->req.startTs, (now - pe->ts) / 1000.0); pe->req.startTs, (now - pe->ts) / 1000.0);
ASSERT(chkId <= pe->req.checkpointId); if (chkId > pe->req.checkpointId) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mError("s-task:0x%x checkpointId:%" PRId64 " is updated to %" PRId64 ", update it", pe->req.taskId,
pe->req.checkpointId, chkId);
return TSDB_CODE_FAILED;
}
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs); code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid); mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
@ -2828,7 +2838,12 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
if (taosArrayGetSize(pInfo->pTaskList) == 0) { if (taosArrayGetSize(pInfo->pTaskList) == 0) {
mndClearConsensusRspEntry(pInfo); mndClearConsensusRspEntry(pInfo);
ASSERT(streamId != -1); if (streamId == -1) {
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList);
mError("streamId is -1, streamId:%" PRIx64, pInfo->streamId);
return TSDB_CODE_FAILED;
}
void* p = taosArrayPush(pStreamList, &streamId); void* p = taosArrayPush(pStreamList, &streamId);
if (p == NULL) { if (p == NULL) {
mError("failed to put into stream list, stream:0x%" PRIx64, streamId); mError("failed to put into stream list, stream:0x%" PRIx64, streamId);

View File

@ -13,12 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "mndDb.h"
#include "mndStb.h"
#include "mndStream.h" #include "mndStream.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tmisce.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "mndStb.h" #include "taoserror.h"
#include "mndDb.h" #include "tmisce.h"
struct SStreamTaskIter { struct SStreamTaskIter {
SStreamObj *pStream; SStreamObj *pStream;
@ -905,7 +906,12 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
} }
} }
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); if (taosHashGetSize(pExecNode->pTaskMap) != taosArrayGetSize(pExecNode->pTaskList)) {
streamMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
mError("task map size, task list size, not equal");
return;
}
// 2. remove stream entry in consensus hash table and checkpoint-report hash table // 2. remove stream entry in consensus hash table and checkpoint-report hash table
(void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);

View File

@ -312,7 +312,12 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
} }
(void)mndRefreshUserIpWhiteList(pMnode); (void)mndRefreshUserIpWhiteList(pMnode);
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm)); SyncIndex fsmIndex = mndSyncAppliedIndex(pFsm);
if (commitIdx != fsmIndex) {
mError("vgId:1, sync restore finished, but commitIdx:%" PRId64 " is not equal to appliedIdx:%" PRId64, commitIdx,
fsmIndex);
mndSetRestored(pMnode, false);
}
} }
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {