diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index de10e991d3..25c94fda56 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -293,10 +293,13 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap return terrno; } - SNode *pNode; int32_t index = 0; for (int32_t i = 0; i < pWrapper->nCols; i++) { SField *pField = (SField *)taosArrayGet(pFields, i); + if (pField == NULL) { + return terrno; + } + if (TSDB_DATA_TYPE_NULL == pField->type) { pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR; pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE; @@ -609,6 +612,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre // build fields for (int32_t i = 0; i < createReq.numOfColumns; i++) { SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i); + TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno); + tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN); pField->flags = pStream->outputSchema.pSchema[i].flags; pField->type = pStream->outputSchema.pSchema[i].type; @@ -623,6 +628,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre // build tags SField *pField = taosArrayGet(createReq.pTags, 0); + TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno); + strcpy(pField->name, "group_id"); pField->type = TSDB_DATA_TYPE_UBIGINT; pField->flags = 0; @@ -634,6 +641,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre for (int32_t i = 0; i < createReq.numOfTags; i++) { SField *pField = taosArrayGet(createReq.pTags, i); + if (pField == NULL) { + continue; + } + pField->bytes = pStream->tagSchema.pSchema[i].bytes; pField->flags = pStream->tagSchema.pSchema[i].flags; pField->type = pStream->tagSchema.pSchema[i].type; @@ -920,10 +931,9 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { } for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { - STaskId *p = taosArrayGet(execInfo.pTaskList, i); - + STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); - if (pEntry == NULL) { + if (p == NULL || pEntry == NULL) { continue; } @@ -967,8 +977,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code); if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } int32_t tlen = sizeof(SMsgHead) + blen; @@ -1002,18 +1011,20 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask int8_t mndTrigger) { void *buf; int32_t tlen; - if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId, pTrans->id, mndTrigger) < 0) { - taosMemoryFree(buf); - return -1; - } - + int32_t code = 0; SEpSet epset = {0}; bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + + if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId, pTrans->id, mndTrigger)) < 0) { + taosMemoryFree(buf); + return code; + } + + code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || !hasEpset) { taosMemoryFree(buf); - return -1; + return code; } code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, @@ -1130,6 +1141,10 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { for (int32_t i = 0; i < numOfNodes; ++i) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); + if (pNodeEntry == NULL) { + continue; + } + if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); streamMutexUnlock(&execInfo.lock); @@ -1170,7 +1185,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; if (taskNodeIsUpdated(pMnode)) { - return -1; + TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); } streamMutexLock(&execInfo.lock); @@ -1240,7 +1255,7 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { STaskId *p = taosArrayGet(pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); - if (pEntry == NULL || pEntry->id.streamId != streamId) { + if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) { continue; } @@ -1278,13 +1293,12 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t numOfCheckpointTrans = 0; if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { - terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS; - return -1; + TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); } SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval)); if (pList == NULL) { - return -1; + return terrno; } int64_t now = taosGetTimestampMs(); @@ -1353,6 +1367,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { for (int32_t i = 0; i < numOfQual; ++i) { SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i); + if (pCheckpointInfo == NULL) { + continue; + } SStreamObj *p = NULL; code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p); @@ -1521,8 +1538,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int32_t code = 0; while (1) { SStreamObj *pStream = NULL; @@ -1535,18 +1553,8 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { sdbCancelFetch(pSdb, pIter); mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64, pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid); - terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; - return -1; + TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED); } else { -#if 0 - if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { - mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - sdbCancelFetch(pSdb, pIter); - return -1; - } -#endif - // kill the related checkpoint trans int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { @@ -1557,10 +1565,11 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { // drop the stream obj in execInfo removeStreamTasksInBuf(pStream, &execInfo); - if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) { + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); - return -1; + return code; } } } @@ -1575,8 +1584,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - return -1; + TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED); } int32_t numOfStreams = 0; @@ -1704,8 +1712,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { SMPauseStreamReq pauseReq = {0}; if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + TAOS_RETURN(TSDB_CODE_INVALID_MSG); } code = mndAcquireStream(pMnode, pauseReq.name, &pStream); @@ -1715,8 +1722,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return 0; } else { mError("stream:%s not exist, failed to pause stream", pauseReq.name); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - return -1; + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } @@ -1736,14 +1742,14 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); - return -1; + TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); } bool updated = taskNodeIsUpdated(pMnode); if (updated) { mError("tasks are not ready for pause, node update detected"); sdbRelease(pMnode->pSdb, pStream); - return -1; + TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); } { // check for tasks, if tasks are not ready, not allowed to pause @@ -1753,6 +1759,9 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); + if (p == NULL) { + continue; + } STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { @@ -1776,13 +1785,13 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { if (!found) { mError("stream:%s task not report status yet, not ready for pause", pauseReq.name); sdbRelease(pMnode->pSdb, pStream); - return -1; + TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); } if (!readyToPause) { mError("stream:%s task not ready for pause yet", pauseReq.name); sdbRelease(pMnode->pSdb, pStream); - return -1; + TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); } } @@ -1843,13 +1852,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = 0; if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { - return -1; + return terrno; } SMResumeStreamReq resumeReq = {0}; if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + TAOS_RETURN(TSDB_CODE_INVALID_MSG); } code = mndAcquireStream(pMnode, resumeReq.name, &pStream); @@ -1860,8 +1868,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return 0; } else { mError("stream:%s not exist, failed to resume stream", resumeReq.name); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - return -1; + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } @@ -1956,10 +1963,16 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); for (int32_t i = 0; i < numOfNodes; ++i) { SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i); + if (pPrevEntry == NULL) { + continue; + } int32_t num = taosArrayGetSize(pNodeList); for (int32_t j = 0; j < num; ++j) { SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); + if(pCurrent == NULL) { + continue; + } if (pCurrent->nodeId == pPrevEntry->nodeId) { if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { @@ -2302,7 +2315,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) bool exist = false; for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) { SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j); - if (pEntry->nodeId == pTask->info.nodeId) { + if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) { exist = true; break; } @@ -2329,14 +2342,17 @@ static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numO int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { int32_t *pId = taosArrayGet(pList, i); + if (pId == NULL) { + continue; + } + if (taskId == *pId) { return; } } - void* p = taosArrayPush(pList, &taskId); int32_t numOfTasks = taosArrayGetSize(pList); - + void *p = taosArrayPush(pList, &taskId); if (p) { mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks); } else { @@ -2445,6 +2461,10 @@ static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pRepor bool existed = false; for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { STaskChkptInfo *p = taosArrayGet(pList, i); + if (p == NULL) { + continue; + } + if (p->taskId == pReport->taskId) { existed = true; break; @@ -2554,6 +2574,10 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId* p = taosArrayGet(execInfo.pTaskList, i); + if (p == NULL) { + continue; + } + if (p->streamId != streamId) { continue; } @@ -2634,6 +2658,10 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { for (int32_t j = 0; j < num; ++j) { SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j); + if (pe == NULL) { + continue; + } + streamId = pe->req.streamId; int32_t existed = 0; @@ -2670,9 +2698,13 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { if (taosArrayGetSize(pList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { int32_t *taskId = taosArrayGet(pList, i); + if (taskId == NULL) { + continue; + } + for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) { SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k); - if (pe->req.taskId == *taskId) { + if ((pe != NULL) && (pe->req.taskId == *taskId)) { taosArrayRemove(pInfo->pTaskList, k); break; } @@ -2694,6 +2726,10 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i); + if (pStreamId == NULL) { + continue; + } + code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId); } @@ -2828,7 +2864,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); if (conflict) { - return -1; + TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; @@ -2841,6 +2877,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); if (code) { + mndTransDrop(pTrans); return code; }