fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-22 10:07:18 +08:00
parent 06ca010059
commit ebe1d09d74
5 changed files with 102 additions and 59 deletions

View File

@ -98,7 +98,7 @@ typedef struct STaskChkptInfo {
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
@ -109,7 +109,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode, int32_t acceptCode);
@ -146,7 +146,7 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks);
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);

View File

@ -768,8 +768,8 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
mndGetStreamNameFromSmaName(streamName, createReq.name);
pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL) {
code = mndAcquireStream(pMnode, streamName, &pStream);
if (pStream != NULL || code != 0) {
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
@ -905,8 +905,10 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
mndGetStreamNameFromSmaName(streamName, pSma->name);
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
if (pStream == NULL || pStream->smaId != pSma->uid) {
SStreamObj *pStream = NULL;
code = mndAcquireStream(pMnode, streamName, &pStream);
if (pStream == NULL || pStream->smaId != pSma->uid || code != 0) {
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
} else {
@ -960,8 +962,9 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
mndGetStreamNameFromSmaName(streamName, pSma->name);
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL && pStream->smaId == pSma->uid) {
SStreamObj *pStream = NULL;
code = mndAcquireStream(pMnode, streamName, &pStream);
if ((pStream != NULL && pStream->smaId == pSma->uid) || code != 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
mndReleaseStream(pMnode, pStream);
@ -1734,8 +1737,8 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
goto _OVER;
}
pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL) {
code = mndAcquireStream(pMnode, streamName, &pStream);
if (pStream != NULL || code != 0) {
mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName);
terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST;
goto _OVER;
@ -2215,7 +2218,7 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
SSmaObj * pBaseTsma = NULL;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj * pStreamObj = NULL;
SStreamObj * pStream = NULL;
SStbObj * pStb = NULL;
while (1) {
@ -2237,14 +2240,16 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
pStreamObj = mndAcquireStream(pMnode, streamName);
if (!pStreamObj) {
pStream = NULL;
code = mndAcquireStream(pMnode, streamName, &pStream);
if (!pStream || (code != 0)) {
sdbRelease(pSdb, pSma);
continue;
}
int64_t streamId = pStreamObj->uid;
mndReleaseStream(pMnode, pStreamObj);
int64_t streamId = pStream->uid;
mndReleaseStream(pMnode, pStream);
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pTsma) {

View File

@ -251,13 +251,13 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
return 0;
}
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
(*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
}
return pStream;
return terrno;
}
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
@ -706,7 +706,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char *sql = NULL;
int32_t sqlLen = 0;
const char *pMsg = "create stream tasks on dnodes";
int32_t code = 0;
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createReq = {0};
@ -726,8 +726,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
pStream = mndAcquireStream(pMnode, createReq.name);
if (pStream != NULL) {
code = mndAcquireStream(pMnode, createReq.name, &pStream);
if (pStream != NULL || code != 0) {
if (createReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
goto _OVER;
@ -1077,7 +1077,12 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
SArray *pNodeSnapshot = NULL;
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes ready, quit from vnodes status check");
taosArrayDestroy(pNodeSnapshot);
@ -1289,6 +1294,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = 0;
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
@ -1299,8 +1305,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mDebug("recv drop stream:%s msg", dropReq.name);
pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) {
code = mndAcquireStream(pMnode, dropReq.name, &pStream);
if (pStream == NULL || code != 0) {
if (dropReq.igNotExists) {
mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
@ -1364,7 +1370,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
// drop all tasks
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
@ -1906,6 +1912,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = 0;
SMPauseStreamReq pauseReq = {0};
if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
@ -1913,9 +1920,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
pStream = mndAcquireStream(pMnode, pauseReq.name);
if (pStream == NULL) {
code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
if (pStream == NULL || code != 0) {
if (pauseReq.igNotExists) {
mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
return 0;
@ -2000,7 +2006,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
// if nodeUpdate happened, not send pause trans
if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) {
@ -2039,6 +2045,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = 0;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return -1;
@ -2050,9 +2057,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
pStream = mndAcquireStream(pMnode, resumeReq.name);
if (pStream == NULL) {
code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
if (pStream == NULL || code != 0) {
if (resumeReq.igNotExists) {
mInfo("stream:%s not exist, not resume stream", resumeReq.name);
sdbRelease(pMnode->pSdb, pStream);
@ -2089,7 +2095,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
// set the resume action
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
@ -2348,7 +2354,13 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
SArray *pNodeSnapshot = NULL;
code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
if (code) {
mError("failed to take the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
taosArrayDestroy(pNodeSnapshot);
atomic_store_32(&mndNodeCheckSentinel, 0);
@ -2790,8 +2802,14 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
mDebug("start to process consensus-checkpointId in tmr");
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
SArray *pNodeSnapshot = NULL;
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
taosArrayDestroy(pNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes are ready, end to process the consensus-checkpointId in tmr process");
taosArrayDestroy(pStreamList);

View File

@ -228,6 +228,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamHbMsg req = {0};
SArray *pFailedChkpt = NULL;
SArray *pOrphanTasks = NULL;
int32_t code = 0;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
@ -296,8 +297,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId);
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks);
SCheckpointConsensusInfo *pInfo = NULL;
code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo);
if (code == 0) {
mndAddConsensusTasks(pInfo, &cp);
} else {
mError("failed to get consensus checkpoint-info");
}
mndReleaseStream(pMnode, pStream);
}
@ -338,9 +346,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedChkpt) > 0) {
bool allReady = true;
if (pMnode != NULL) {
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
SArray *p = NULL;
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &p);
taosArrayDestroy(p);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
} else {
allReady = false;
}

View File

@ -80,11 +80,12 @@ void destroyStreamTaskIter(SStreamTaskIter* pIter) {
taosMemoryFree(pIter);
}
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t replica = -1; // do the replica check
int32_t code = 0;
*allReady = true;
SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
@ -157,7 +158,8 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
sdbRelease(pSdb, pObj);
}
return pVgroupList;
*pList = pVgroupList;
return code;
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
@ -935,10 +937,10 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
return TSDB_CODE_ACTION_IN_PROGRESS;
}
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) {
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
*pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
if (pInfo != NULL) {
return (SCheckpointConsensusInfo*)pInfo;
return 0;
}
SCheckpointConsensusInfo p = {
@ -947,10 +949,14 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId,
.streamId = streamId,
};
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
return pChkptInfo;
int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
if (code == 0) {
void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId));
*pInfo = pChkptInfo;
} else {
*pInfo = NULL;
}
return code;
}
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually