diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 31b9f62346..e71a6c4dce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -636,7 +636,7 @@ typedef struct SCheckpointConsensusInfo { int64_t streamId; } SCheckpointConsensusInfo; -int32_t streamSetupScheduleTrigger(SStreamTask* pTask); +void streamSetupScheduleTrigger(SStreamTask* pTask); // dispatch related int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); @@ -793,6 +793,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts); int32_t streamTimerGetInstance(tmr_h* pTmr); void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, const char* pMsg); +void streamTmrStop(tmr_h tmrId); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index dd371c6a2a..f3f4b29617 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -171,8 +171,9 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t cap) { ret = snprintf(pBuf + nwrite, cap, "}, inUse:%d", pEpSet->inUse); if (ret <= 0 || ret >= cap) { return TSDB_CODE_OUT_OF_BUFFER; + } else { + return TSDB_CODE_SUCCESS; } - return TSDB_CODE_SUCCESS; } int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 75ba51e498..dceb86c963 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -114,7 +114,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); -bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 86802914b8..b8d44da8c4 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -19,8 +19,8 @@ #include "systable.h" #include "mndUser.h" -#define SHOW_STEP_SIZE 100 -#define SHOW_COLS_STEP_SIZE 4096 +#define SHOW_STEP_SIZE 100 +#define SHOW_COLS_STEP_SIZE 4096 #define SHOW_PRIVILEGES_STEP_SIZE 2048 static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f7b97a2c3c..ade8d541de 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -63,8 +63,8 @@ static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq); - -static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); +static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo* pInfo); +static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); @@ -220,7 +220,7 @@ STREAM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { char *p = (pStream == NULL) ? "null" : pStream->name; - mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr()); + mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno)); taosMemoryFreeClear(pRow); return NULL; } @@ -282,9 +282,9 @@ int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 || pCreate->targetStbFullName[0] == 0) { - return terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; + return TSDB_CODE_MND_INVALID_STREAM_OPTION; } - return 0; + return TSDB_CODE_SUCCESS; } static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) { @@ -366,8 +366,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); if (pSourceDb == NULL) { - mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); code = terrno; + mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code)); goto FAIL; } @@ -378,8 +378,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); if (pTargetDb == NULL) { - mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); code = terrno; + mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code)); goto FAIL; } @@ -543,8 +543,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { code = tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - if (code == -1) { - code = TSDB_CODE_INVALID_PARA; + if (code != 0) { mError("failed to encode stream task, code:%s", tstrerror(code)); taosMemoryFree(buf); return code; @@ -616,8 +615,8 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); @@ -669,19 +668,19 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre } } - if (mndCheckCreateStbReq(&createReq) != 0) { + if ((code = mndCheckCreateStbReq(&createReq)) != 0) { goto _OVER; } pStb = mndAcquireStb(pMnode, createReq.name); if (pStb != NULL) { - terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; + code = TSDB_CODE_MND_STB_ALREADY_EXIST; goto _OVER; } pDb = mndAcquireDbByStb(pMnode, createReq.name); if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + code = TSDB_CODE_MND_DB_NOT_SELECTED; goto _OVER; } @@ -691,7 +690,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre } if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { - terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; + code = TSDB_CODE_MND_SINGLE_STB_MODE_DB; goto _OVER; } @@ -720,7 +719,8 @@ _OVER: mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); - mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); + mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino, + tstrerror(code)); return code; } @@ -742,16 +742,14 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) { mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM, pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); - terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; - return terrno; + return TSDB_CODE_MND_TOO_MANY_STREAMS; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name, pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); - terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; - return terrno; + return TSDB_CODE_MND_INVALID_TARGET_TABLE; } } @@ -767,13 +765,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { const char *pMsg = "create stream tasks on dnodes"; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + STrans *pTrans = NULL; - terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createReq = {0}; - if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - code = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq); + TSDB_CHECK_CODE(code, lino, _OVER); #ifdef WINDOWS code = TSDB_CODE_MND_INVALID_PLATFORM; @@ -782,7 +778,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql); if ((code = mndCheckCreateStreamReq(&createReq)) != 0) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code)); goto _OVER; } @@ -790,7 +786,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (pStream != NULL && code == 0) { if (createReq.igExists) { mInfo("stream:%s, already exist, ignore exist is set", createReq.name); - goto _OVER; + mndReleaseStream(pMnode, pStream); + tFreeSCMCreateStreamReq(&createReq); + return code; } else { code = TSDB_CODE_MND_STREAM_ALREADY_EXIST; goto _OVER; @@ -807,21 +805,20 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { sqlLen = strlen(createReq.sql); sql = taosMemoryMalloc(sqlLen + 1); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); + memset(sql, 0, sqlLen + 1); memcpy(sql, createReq.sql, sqlLen); } // build stream obj from request if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code)); goto _OVER; } - if ((code = doStreamCheck(pMnode, &streamObj)) < 0) { - goto _OVER; - } + code = doStreamCheck(pMnode, &streamObj); + TSDB_CHECK_CODE(code, lino, _OVER); - STrans *pTrans = NULL; code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans); if (pTrans == NULL || code) { goto _OVER; @@ -830,7 +827,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // create stb for stream if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) { if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) { - mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); + mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } @@ -841,7 +838,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // schedule stream task for stream obj code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); + mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } @@ -849,7 +846,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // add stream to trans code = mndPersistStream(pTrans, &streamObj); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to persist since %s", createReq.name, terrstr()); + mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } @@ -874,17 +871,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // execute creation code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } mndTransDrop(pTrans); - if (code == 0) { - code = TSDB_CODE_ACTION_IN_PROGRESS; - } - SName dbname = {0}; code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (code) { @@ -910,9 +903,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { _OVER: if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, terrstr(code)); } else { mDebug("stream:%s create stream completed", createReq.name); + code = TSDB_CODE_ACTION_IN_PROGRESS; } mndReleaseStream(pMnode, pStream); @@ -1058,23 +1052,24 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, int8_t mndTrigger, bool lock) { int32_t code = TSDB_CODE_SUCCESS; + bool conflict = false; int64_t ts = taosGetTimestampMs(); + STrans *pTrans = NULL; + if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { return code; } - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); - if (conflict) { - mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb, - pStream->name, pStream->uid); - return TSDB_CODE_MND_TRANS_CONFLICT; + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); + if (code) { + mWarn("checkpoint conflict with other trans in %s, code:%s ignore the checkpoint for stream:%s %" PRIx64, + pStream->sourceDb, tstrerror(code), pStream->name, pStream->uid); + goto _ERR; } - STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, - "gen checkpoint for stream", &pTrans); - if (pTrans == NULL || code) { - code = TSDB_CODE_MND_TRANS_CONFLICT; + "gen checkpoint for stream", &pTrans); + if (code) { mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, tstrerror(code)); goto _ERR; @@ -1126,7 +1121,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to prepare checkpoint trans since %s", terrstr()); + mError("failed to prepare checkpoint trans since %s", terrstr(code)); + } else { + code = TSDB_CODE_ACTION_IN_PROGRESS; } _ERR: @@ -1147,6 +1144,9 @@ int32_t extractStreamNodeList(SMnode *pMnode) { } static bool taskNodeIsUpdated(SMnode *pMnode) { + bool allReady = true; + SArray *pNodeSnapshot = NULL; + // check if the node update happens or not streamMutexLock(&execInfo.lock); @@ -1171,13 +1171,11 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } } - bool allReady = true; - SArray *pNodeSnapshot = NULL; - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } + if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); taosArrayDestroy(pNodeSnapshot); @@ -1185,12 +1183,16 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { return true; } - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = {0}; + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + if (code) { + streamMutexUnlock(&execInfo.lock); + return false; + } bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - taosArrayDestroy(changeInfo.pUpdateNodeList); - taosHashCleanup(changeInfo.pDBMap); + mndDestroyVgroupChangeInfo(&changeInfo); taosArrayDestroy(pNodeSnapshot); if (nodeUpdated) { @@ -1400,7 +1402,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); - if (code != -1) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { started += 1; if (started >= capacity) { @@ -1438,10 +1440,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { tFreeMDropStreamReq(&dropReq); return 0; } else { - code = TSDB_CODE_MND_STREAM_NOT_EXIST; mError("stream:%s not exist failed to drop it", dropReq.name); tFreeMDropStreamReq(&dropReq); - TAOS_RETURN(code); + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } @@ -1480,17 +1481,17 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true); + if (code) { sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); - return terrno; + return code; } STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr()); + mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); TAOS_RETURN(code); @@ -1526,7 +1527,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); @@ -1756,16 +1757,16 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return 0; } - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) { sdbRelease(pMnode->pSdb, pStream); - return -1; + return code; } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); + if (code) { sdbRelease(pMnode->pSdb, pStream); - TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); + TAOS_RETURN(code); } bool updated = taskNodeIsUpdated(pMnode); @@ -1821,7 +1822,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); + mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } @@ -1836,7 +1837,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // if nodeUpdate happened, not send pause trans code = mndStreamSetPauseAction(pMnode, pTrans, pStream); if (code) { - mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); + mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1857,7 +1858,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1874,8 +1875,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; - if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { - return terrno; + if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { + return code; } SMResumeStreamReq resumeReq = {0}; @@ -1906,17 +1907,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true); + if (code) { sdbRelease(pMnode->pSdb, pStream); - return terrno; + return code; } STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); + mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } @@ -1929,8 +1930,9 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // set the resume action - if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { - mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); + code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated); + if (code) { + mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1950,7 +1952,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { taosWUnLockLatch(&pStream->lock); code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1977,11 +1979,22 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) // tasks on the will be removed replica. // 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we // will handle it as mentioned in 1 & 2 items. -static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) { - SVgroupChangeInfo info = { - .pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), - .pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), - }; +static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, + SVgroupChangeInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; + + if (pInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), + pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + + if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { + mndDestroyVgroupChangeInfo(pInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); for (int32_t i = 0; i < numOfNodes; ++i) { @@ -2002,7 +2015,11 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); char buf[256] = {0}; - (void) epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error + code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error + if (code) { + mError("failed to convert epset string, code:%s", tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _err); + } mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); @@ -2011,20 +2028,16 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); epsetAssign(&updateInfo.newEp, &pCurrent->epset); - void* p = taosArrayPush(info.pUpdateNodeList, &updateInfo); - if (p == NULL) { - mError("failed to put update entry into node list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - } + void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo); + TSDB_CHECK_NULL(p, code, lino, _err, terrno); } // todo handle the snode info if (pCurrent->nodeId != SNODE_HANDLE) { SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); - int32_t code = taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); + code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); mndReleaseVgroup(pMnode, pVgroup); - if (code) { - mError("failed to put into dbmap, code:out of memory"); - } + TSDB_CHECK_CODE(code, lino, _err); } break; @@ -2032,7 +2045,18 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP } } - return info; + return code; + + _err: + mndDestroyVgroupChangeInfo(pInfo); + return code; +} + +static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) { + if (pInfo != NULL) { + taosArrayDestroy(pInfo->pUpdateNodeList); + taosHashCleanup(pInfo->pDBMap); + } } static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { @@ -2049,13 +2073,13 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange break; } - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false); + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false); sdbRelease(pSdb, pStream); - if (conflict) { - mError("nodeUpdate conflict with other trans, current nodeUpdate ignored"); + if (code) { + mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code)); sdbCancelFetch(pSdb, pIter); - return terrno; + return code; } } @@ -2119,7 +2143,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -2276,7 +2300,11 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { goto _end; } - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = {0}; + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + if (code) { + goto _end; + } { if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) { @@ -2310,8 +2338,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { mDebug("no update found in nodeList"); } - taosArrayDestroy(changeInfo.pUpdateNodeList); - taosHashCleanup(changeInfo.pDBMap); + mndDestroyVgroupChangeInfo(&changeInfo); _end: streamMutexUnlock(&execInfo.lock); @@ -2640,9 +2667,8 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; streamMutexUnlock(&execInfo.lock); - return terrno; + return TSDB_CODE_MND_STREAM_NOT_EXIST; } else { mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", req.streamId, req.taskId); @@ -2993,7 +3019,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -3038,9 +3064,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); - if (conflict) { - code = TSDB_CODE_MND_TRANS_CONFLICT; + code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); + if (code) { goto _err; } @@ -3048,7 +3073,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); if (pTrans == NULL || code != 0) { - mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mError("failed to create trans to drop orphan tasks since %s", tstrerror(code)); goto _err; } @@ -3059,7 +3084,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { // drop all tasks if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) { - mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mError("failed to create trans to drop orphan tasks since %s", tstrerror(code)); goto _err; } @@ -3070,7 +3095,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code)); goto _err; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index f515e9565d..d31d0dad65 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -100,7 +100,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -109,6 +109,9 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } return code; } @@ -221,8 +224,8 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); } else { - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); + if (code) { mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, pStream->sourceDb, pStream->targetSTbName); } else { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 494771e65e..25a735e152 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -50,7 +50,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) pEntry->startTime); void* p = taosArrayPush(pList, &info); if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { @@ -77,7 +77,6 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size, taosHashGetSize(execInfo.transMgmt.pDBTrans), num); - terrno = TSDB_CODE_SUCCESS; taosArrayDestroy(pList); if (pNumOfActiveChkpt != NULL) { @@ -91,7 +90,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) // For a given stream: // 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. // 2. create/drop/reset/update trans are conflict with any other trans. -bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { if (lock) { streamMutexLock(&execInfo.lock); } @@ -101,7 +100,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p if (lock) { streamMutexUnlock(&execInfo.lock); } - return false; + return 0; } int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); @@ -121,8 +120,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); - terrno = TSDB_CODE_MND_TRANS_CONFLICT; - return true; + return TSDB_CODE_MND_TRANS_CONFLICT; } else { mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName); } @@ -131,8 +129,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); - terrno = TSDB_CODE_MND_TRANS_CONFLICT; - return true; + return TSDB_CODE_MND_TRANS_CONFLICT; } } else { mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); @@ -142,7 +139,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p streamMutexUnlock(&execInfo.lock); } - return false; + return 0; } int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { @@ -202,47 +199,48 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { int32_t code = 0; int32_t lino = 0; - terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; + void *buf = NULL; SEncoder encoder; tEncoderInit(&encoder, NULL, 0); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { + if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) { tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; + TSDB_CHECK_CODE(code, lino, _over); } + int32_t tlen = encoder.pos; tEncoderClear(&encoder); int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); - if (pRaw == NULL) goto STREAM_ENCODE_OVER; + TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno); buf = taosMemoryMalloc(tlen); - if (buf == NULL) goto STREAM_ENCODE_OVER; + TSDB_CHECK_NULL(buf, code, lino, _over, terrno); tEncoderInit(&encoder, buf, tlen); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { + if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) { tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; + TSDB_CHECK_CODE(code, lino, _over); } + tEncoderClear(&encoder); int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); - SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, tlen, _over); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over); + SDB_SET_DATALEN(pRaw, dataPos, _over); - terrno = TSDB_CODE_SUCCESS; - -STREAM_ENCODE_OVER: +_over: taosMemoryFreeClear(buf); - if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); + if (code != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code)); sdbFreeRaw(pRaw); + terrno = code; return NULL; } + terrno = 0; mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream, pStream->checkpointId); return pRaw; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index abe87fed64..a405cdde0a 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1105,8 +1105,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", pStream->uid, pStream->name, total); - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); - if (!conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (code == 0) { code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry taosArrayClear(px->pTaskList); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index de295f2611..c865e3ec6e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -752,7 +752,7 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - (void)streamSetupScheduleTrigger(pTask); + streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); @@ -802,6 +802,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask const char* id = pTask->id.idStr; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; SVersionRange* pStep2Range = &pTask->step2Range; + int32_t vgId = pTask->pMeta->vgId; // if it's an source task, extract the last version in wal. bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); @@ -837,12 +838,15 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); - (void)streamTaskSetSchedStatusInactive(pTask); + int8_t status = streamTaskSetSchedStatusInactive(pTask); // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); if (code == TSDB_CODE_SUCCESS) { - (void)tqScanWalAsync(pTq, false); + code = tqScanWalAsync(pTq, false); + if (code) { + tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); + } } } } @@ -1001,7 +1005,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // let's continue scan data in the wal files if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { - (void)tqScanWalAsync(pTq, false); // it's ok to failed + code = tqScanWalAsync(pTq, false); // it's ok to failed + if (code) { + tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + } } return code; @@ -1103,7 +1110,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); + } + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } @@ -1112,7 +1123,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); + } + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode } @@ -1122,7 +1137,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); + } + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode } @@ -1134,7 +1153,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1147,7 +1169,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; // todo retry handle error } @@ -1165,7 +1191,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } @@ -1199,7 +1229,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } + tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; @@ -1228,7 +1262,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { SRpcMsg rsp = {0}; - (void)streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); + if (ret) { // suppress the error in build checkpointsource rsp + tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 60da4e3799..cccc96d3f0 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -389,7 +389,10 @@ static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) { END: tdbFree(pKey); tdbFree(pVal); - (void)tdbTbcClose(pCur); + int32_t ret = tdbTbcClose(pCur); + if (code == 0 && ret != 0) { + code = ret; + } return code; } @@ -461,7 +464,12 @@ static int32_t tqMetaRestoreCheckInfo(STQ* pTq) { END: tdbFree(pKey); tdbFree(pVal); - (void)tdbTbcClose(pCur); + + int32_t ret = tdbTbcClose(pCur); + if (code == 0) { + code = ret; + } + tDeleteSTqCheckInfo(&info); return code; } @@ -476,13 +484,13 @@ int32_t tqMetaOpen(STQ* pTq) { TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); } else { TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); - (void)taosRemoveFile(maindb); + TQ_ERR_GO_TO_END(taosRemoveFile(maindb)); } TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); if(taosCheckExistFile(offsetNew)){ TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew)); - (void)taosRemoveFile(offsetNew); + TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew)); } TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq)); @@ -518,7 +526,7 @@ int32_t tqMetaTransform(STQ* pTq) { if (taosCopyFile(offset, offsetNew) < 0) { tqError("copy offset file error"); } else { - (void)taosRemoveFile(offset); + TQ_ERR_GO_TO_END(taosRemoveFile(offset)); } } @@ -527,22 +535,47 @@ END: taosMemoryFree(offsetNew); // return 0 always, so ignore - (void)tdbTbClose(pExecStore); - (void)tdbTbClose(pCheckStore); - (void)tdbClose(pMetaDB); + int32_t ret = tdbTbClose(pExecStore); + if (ret != 0) { + tqError("vgId:%d failed to close stream exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret)); + } + + ret = tdbTbClose(pCheckStore); + if (ret != 0) { + tqError("vgId:%d failed to close stream check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret)); + } + + ret = tdbClose(pMetaDB); + if (ret != 0) { + tqError("vgId:%d failed to close stream meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(ret)); + } return code; } void tqMetaClose(STQ* pTq) { + int32_t code = 0; if (pTq->pExecStore) { - (void)tdbTbClose(pTq->pExecStore); + code = tdbTbClose(pTq->pExecStore); + if (code) { + tqError("vgId:%d failed to close tq exec store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + } } if (pTq->pCheckStore) { - (void)tdbTbClose(pTq->pCheckStore); + code = tdbTbClose(pTq->pCheckStore); + if (code) { + tqError("vgId:%d failed to close tq check store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + } } if (pTq->pOffsetStore) { - (void)tdbTbClose(pTq->pOffsetStore); + code = tdbTbClose(pTq->pOffsetStore); + if (code) { + tqError("vgId:%d failed to close tq offset store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); + } + } + + code = tdbClose(pTq->pMetaDB); + if (code) { + tqError("vgId:%d failed to close tq meta db store, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); } - (void)tdbClose(pTq->pMetaDB); } diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index a279b58d32..420fbf9c56 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -83,18 +83,27 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa _err: tqError("vgId:%d, vnode stream-task snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - (void)streamTaskSnapReaderClose(pReader); + int32_t ret = streamTaskSnapReaderClose(pReader); *ppReader = NULL; return code; } int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { - if (pReader == NULL) return 0; + if (pReader == NULL) { + return 0; + } int32_t code = 0; - tqInfo("vgId:%d, vnode stream-task snapshot reader closed", TD_VID(pReader->pTq->pVnode)); + int32_t vgId = TD_VID(pReader->pTq->pVnode); + taosArrayDestroy(pReader->tdbTbList); - (void)tdbTbcClose(pReader->pCur); + code = tdbTbcClose(pReader->pCur); + if (code) { + tqError("vgId:%d failed to close stream meta reader, code:%s", vgId, tstrerror(code)); + } else { + tqInfo("vgId:%d, vnode stream-task snapshot reader closed", vgId); + } + taosMemoryFree(pReader); return code; } @@ -113,6 +122,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { tqDebug("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode)); STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos); + NextTbl: except = 0; for (;;) { @@ -127,6 +137,7 @@ NextTbl: code = terrno; goto _err; } + memcpy(pVal, tVal, tLen); vLen = tLen; } @@ -163,8 +174,8 @@ NextTbl: taosMemoryFree(pVal); tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); - return code; + _err: tqError("vgId:%d, vnode stream-task snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 68f43d637b..c00d9a93bb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -207,7 +207,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList); // send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode - (void)streamTaskSendCheckpointsourceRsp(pTask); + code = streamTaskSendCheckpointsourceRsp(pTask); + if (code) { + tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + } streamTaskResetStatus(pTask); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); @@ -806,25 +809,26 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t type = pReq->reqType; int32_t vgId = pMeta->vgId; + int32_t code = 0; if (type == STREAM_EXEC_T_START_ONE_TASK) { - (void)streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId); return 0; } else if (type == STREAM_EXEC_T_START_ALL_TASKS) { - (void)streamMetaStartAllTasks(pMeta); + code = streamMetaStartAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) { - (void)restartStreamTasks(pMeta, isLeader); + code = restartStreamTasks(pMeta, isLeader); return 0; } else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) { - (void)streamMetaStopAllTasks(pMeta); + code = streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { - int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); + code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if (pTask != NULL && (code == 0)) { char* pStatus = NULL; @@ -846,7 +850,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); + code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { @@ -864,7 +868,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. // todo add one function to handle this tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId); - return -1; + return code; } } @@ -1229,7 +1233,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqError( "vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.taskId); - (void)streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); + // ignore this code to avoid error code over write + int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index ed67bfca3e..feceabe625 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -379,6 +379,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl int32_t lino = 0; void *px = NULL; int32_t startIndex = 0; + int32_t ret = 0; int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray); if (numOfBlocks <= 0) { @@ -487,7 +488,9 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } else { STbStatisRecord record = {0}; while (i < rows) { - (void)tStatisBlockGet(&block, i, &record); + code = tStatisBlockGet(&block, i, &record); + TSDB_CHECK_CODE(code, lino, _end); + if (record.suid != suid) { break; } @@ -534,7 +537,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } _end: - (void) tStatisBlockDestroy(&block); + ret = tStatisBlockDestroy(&block); if (code != 0) { tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code)); } else { @@ -678,7 +681,11 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32 } void tLDataIterClose2(SLDataIter *pIter) { - (void)tsdbSttFileReaderClose(&pIter->pReader); // always return 0 + int32_t code = tsdbSttFileReaderClose(&pIter->pReader); // always return 0 + if (code != 0) { + tsdbError("%" PRId64 " failed to close tsdb file reader, code:%s", pIter->cid, tstrerror(code)); + } + pIter->pReader = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 714096bb85..313205f2cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -449,10 +449,14 @@ static int32_t tsdbUninitReaderLock(STsdbReader* pReader) { static int32_t tsdbAcquireReader(STsdbReader* pReader) { int32_t code = -1; - tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %s, pre-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code); code = taosThreadMutexLock(&pReader->readerMutex); - tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + if (code != 0) { + tsdbError("tsdb/read:%p, failed to lock reader mutex, code:%s", pReader->idStr, tstrerror(code)); + } else { + tsdbTrace("tsdb/read: %s, post-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code); + } return code; } @@ -4574,7 +4578,10 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t STableBlockScanInfo** p = NULL; int32_t iter = 0; - (void)tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); + if (code) { + return code; + } while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) { clearBlockScanInfo(*p); @@ -4808,7 +4815,10 @@ void tsdbReaderClose2(STsdbReader* pReader) { return; } - (void)tsdbAcquireReader(pReader); + int32_t code = tsdbAcquireReader(pReader); + if (code) { + return; + } { if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) { @@ -5856,6 +5866,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; SVersionRange* pRange = &pReader->info.verRange; + int32_t lino = 0; *ppSnap = NULL; // lock @@ -5869,8 +5880,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); if (pSnap == NULL) { (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = terrno; - goto _exit; + TSDB_CHECK_NULL(pSnap, code, lino, _exit, terrno); } // take snapshot @@ -5879,14 +5889,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { (void) taosThreadMutexUnlock(&pTsdb->mutex); - code = terrno; - goto _exit; + TSDB_CHECK_NULL(pSnap->pNode, code, lino, _exit, terrno); } pSnap->pNode->pQHandle = pReader; pSnap->pNode->reseek = reseek; - (void)tsdbRefMemTable(pTsdb->mem, pSnap->pNode); + code = tsdbRefMemTable(pTsdb->mem, pSnap->pNode); + TSDB_CHECK_CODE(code, lino, _exit); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { @@ -5906,7 +5916,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pINode->pQHandle = pReader; pSnap->pINode->reseek = reseek; - (void)tsdbRefMemTable(pTsdb->imem, pSnap->pINode); + code = tsdbRefMemTable(pTsdb->imem, pSnap->pINode); + TSDB_CHECK_CODE(code, lino, _exit); } // fs @@ -5921,8 +5932,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs } (void) taosThreadMutexUnlock(&pTsdb->mutex); - goto _exit; - } + TSDB_CHECK_CODE(code, lino, _exit);} // unlock (void) taosThreadMutexUnlock(&pTsdb->mutex); @@ -5932,7 +5942,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs return code; _exit: - tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("%s vgId:%d take read snapshot failed, line:%d code:%s", pReader->idStr, TD_VID(pTsdb->pVnode), lino, + tstrerror(code)); if (pSnap) { if (pSnap->pNode) taosMemoryFree(pSnap->pNode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 9d58e2c7bd..249d0d50ef 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -779,8 +779,9 @@ typedef enum { BLK_CHECK_QUIT = 0x2, } ETombBlkCheckEnum; -static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, - const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j); +static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, + const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, + int32_t* j); static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, ETombBlkCheckEnum* pRet) { int32_t code = 0; @@ -912,7 +913,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs ETombBlkCheckEnum ret = 0; code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret); - (void)tTombBlockDestroy(&block); + tTombBlockDestroy(&block); if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) { return code; } @@ -994,11 +995,17 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, - int32_t numOfTables) { + int32_t numOfTables, int32_t* pNumOfRows) { int32_t num = 0; + int32_t code = 0; + int32_t lino = 0; + + if (pNumOfRows != 0) { + *pNumOfRows = 0; + } if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { - return 0; + return code; } int32_t i = 0; @@ -1007,18 +1014,19 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (i >= TARRAY2_SIZE(pStatisBlkArray)) { - return 0; + return code; } SStatisBlk* p = &pStatisBlkArray->data[i]; STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); - (void)tStatisBlockInit(pStatisBlock); + TSDB_CHECK_NULL(pStatisBlock, code, lino, _err, terrno); + + code = tStatisBlockInit(pStatisBlock); + TSDB_CHECK_CODE(code, lino, _err); int64_t st = taosGetTimestampMs(); - int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); - if (code != TSDB_CODE_SUCCESS) { - return 0; - } + code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); + TSDB_CHECK_CODE(code, lino, _err); double el = (taosGetTimestampMs() - st) / 1000.0; pBlockLoadInfo->cost.loadStatisBlocks += 1; @@ -1030,9 +1038,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo } if (index >= pStatisBlock->numOfRecords) { - (void)tStatisBlockDestroy(pStatisBlock); + code = tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); - return num; + *pNumOfRows = num; + return code; } int32_t j = index; @@ -1040,9 +1049,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { p = &pStatisBlkArray->data[i]; if (p->minTbid.suid > suid) { - (void)tStatisBlockDestroy(pStatisBlock); + code = tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); - return num; + *pNumOfRows = num; + return code; } uint64_t uid = pUidList[uidIndex]; @@ -1051,30 +1061,44 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo num += ((int64_t*)pStatisBlock->counts.data)[j]; uidIndex += 1; j += 1; - loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + TSDB_CHECK_CODE(code, lino, _err); } else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) { j += 1; - loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + TSDB_CHECK_CODE(code, lino, _err); } else { uidIndex += 1; } } - (void)tStatisBlockDestroy(pStatisBlock); + int32_t ret = tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); - return num; + *pNumOfRows = num; + return code; + +_err: + tsdbError("%p failed to get number of rows in stt block, %s at line:%d code:%s", pSttFileReader, __func__, lino, + tstrerror(code)); + return code; } // load next stt statistics block -static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, +static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock, const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) { if ((*j) >= numOfRows) { (*i) += 1; (*j) = 0; if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { - (void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); + int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); + if (code != 0) { + tsdbError("%p failed to read statisBlock, code:%s", pSttFileReader, tstrerror(code)); + return code; + } } } + + return 0; } int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { @@ -1191,8 +1215,13 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra STsdbReader* pReader = pConf->pReader; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); uint64_t* pUidList = pReader->status.uidList.tableUidList; - numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList, - numOfTables); + int32_t n = 0; + code = getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList, + numOfTables, &n); + numOfRows += n; + if (code) { + tsdbError("%s failed to get rows in stt blocks, code:%s", pstr, tstrerror(code)); + } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index ed895b7d27..cc37f20cf6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -343,7 +343,7 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, - int32_t numOfTables); + int32_t numOfTables, int32_t* pNumOfRows); void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record); void destroyLDataIter(SLDataIter* pIter); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d8a29c8e73..f725fb3809 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -587,7 +587,10 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) streamMetaWUnLock(pMeta); tqInfo("vgId:%d stream task already loaded, start them", vgId); - (void)streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); + int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); + if (code != 0) { + tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code)); + } return; } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 916aee4e6e..e091d0f34b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -153,9 +153,9 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri streamMutexUnlock(&pInfo->lock); // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions. - (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, - pRsp->upstreamTaskId); - return TSDB_CODE_SUCCESS; + int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, + pRsp->upstreamTaskId); + return code; } int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, @@ -192,6 +192,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; + if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) { stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr); return 0; @@ -246,7 +247,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); if (code == TSDB_CODE_SUCCESS) { - (void)tmsgSendReq(&pInfo->epSet, &msg); + code = tmsgSendReq(&pInfo->epSet, &msg); } } @@ -358,9 +359,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); - (void)continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure + code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); // todo handle this failure } else { // only one task exists, no need to dispatch downstream info - (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, + code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1); streamFreeQitem((SStreamQueueItem*)pBlock); } @@ -372,7 +373,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // todo: handle this // update the child Id for downstream tasks - (void) streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); + code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); // there are still some upstream tasks not send checkpoint request, do nothing and wait for then if (pActiveInfo->allUpstreamTriggerRecv != 1) { @@ -384,7 +385,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (taskLevel == TASK_LEVEL__SINK) { stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num); streamFreeQitem((SStreamQueueItem*)pBlock); - (void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet + code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); @@ -480,7 +481,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId } else { if ((notReady == 0) && (code == 0) && (!alreadyHandled)) { stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); - (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); + code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } } @@ -562,7 +563,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { - (void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); @@ -639,14 +640,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id, vgId, pReq->checkpointId, terrstr()); - return code; + return TSDB_CODE_SUCCESS; } streamMetaWUnLock(pMeta); // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - (void) streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t)pReq->hTaskId, numOfTasks); @@ -991,7 +992,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // do send retrieve checkpoint trigger msg to upstream int32_t size = taosArrayGetSize(pNotSendList); - (void)doSendRetrieveTriggerMsg(pTask, pNotSendList); + int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + if (code) { + stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + } + streamMutexUnlock(&pActiveInfo->lock); // check every 100ms @@ -1262,7 +1267,11 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { } } - (void) taosCloseDir(&pDir); + int32_t ret = taosCloseDir(&pDir); + if (code == 0 && ret != 0) { + code = ret; + } + return code; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ad1866807a..2a32a1d522 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1255,7 +1255,11 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa .recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId}; // todo retry until it success - (void)streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); + int32_t code = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS); + if (code) { + stError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; streamMutexLock(&pActiveInfo->lock); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 585cf63cfc..2d54547aa2 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -19,7 +19,7 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); -int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { +void streamSetupScheduleTrigger(SStreamTask* pTask) { if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, @@ -29,8 +29,6 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } - - return 0; } int32_t streamTrySchedExec(SStreamTask* pTask) { @@ -47,7 +45,7 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, - terrstr()); + tstrerror(terrno)); return terrno; } @@ -94,7 +92,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { SStreamTaskState p = streamTaskGetStatus(pTask); if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) { - (void) streamTaskSetSchedStatusInactive(pTask); + int8_t status = streamTaskSetSchedStatusInactive(pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2acfd64b2a..6d8f90f7f6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -24,8 +24,8 @@ #include "wal.h" static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); -static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); -static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate); +static int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); +static int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate); static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo); static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { @@ -36,15 +36,25 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { } static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { - char buf[512] = {0}; + int32_t code = 0; + char buf[512] = {0}; + if (pTask->info.nodeId == nodeId) { // execution task should be moved away bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet); - (void)epsetToStr(pEpSet, buf, tListLen(buf)); + code = epsetToStr(pEpSet, buf, tListLen(buf)); + if (code) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } if (!isEqual) { (*pUpdated) = true; char tmp[512] = {0}; - (void)epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors + code = epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); // only for log file, ignore errors + if (code) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } epsetAssign(&pTask->info.epSet, pEpSet); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); @@ -56,15 +66,15 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp // check for the dispatch info and the upstream task info int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated); + code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated); } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated); - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated); + code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated); + code = streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet, pUpdated); } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated); + code = streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet, pUpdated); } - return 0; + return code; } static void freeItem(void* p) { @@ -227,17 +237,17 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->schedInfo.pDelayTimer != NULL) { - (void)taosTmrStop(pTask->schedInfo.pDelayTimer); + streamTmrStop(pTask->schedInfo.pDelayTimer); pTask->schedInfo.pDelayTimer = NULL; } if (pTask->hTaskInfo.pTimer != NULL) { - (void)taosTmrStop(pTask->hTaskInfo.pTimer); + streamTmrStop(pTask->hTaskInfo.pTimer); pTask->hTaskInfo.pTimer = NULL; } if (pTask->msgInfo.pRetryTmr != NULL) { - (void)taosTmrStop(pTask->msgInfo.pRetryTmr); + streamTmrStop(pTask->msgInfo.pRetryTmr); pTask->msgInfo.pRetryTmr = NULL; } @@ -402,8 +412,14 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { return 0; } + int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { - (void)createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); + int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr); + if (code) { + stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code)); + return code; + } + pTask->refCnt = 1; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; @@ -419,7 +435,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.timerActive = 0; - int32_t code = streamCreateStateMachine(pTask); + code = streamCreateStateMachine(pTask); if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr, tstrerror(code)); @@ -439,6 +455,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i code = taosThreadMutexInit(&pTask->msgInfo.lock, NULL); if (code) { + stError("s-task:0x%x failed to init msgInfo mutex, code:%s", pTask->id.taskId, tstrerror(code)); return code; } @@ -484,16 +501,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); if (pOutputInfo->pNodeEpsetUpdateList == NULL) { - stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; + stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(terrno)); + return terrno; } pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); if (pTask->taskCheckInfo.pList == NULL) { - stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; + stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr, tstrerror(terrno)); + return terrno; } if (pTask->chkInfo.pActiveInfo == NULL) { @@ -539,9 +554,14 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre return (p == NULL) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; } -void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { - char buf[512] = {0}; - (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. +int32_t streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { + int32_t code = 0; + char buf[512] = {0}; + code = epsetToStr(pEpSet, buf, tListLen(buf)); // ignore error since it is only for log file. + if (code != 0) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < numOfUpstream; ++i) { @@ -552,7 +572,11 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS *pUpdated = true; char tmp[512] = {0}; - (void)epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); + code = epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); + if (code != 0) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } epsetAssign(&pInfo->epSet, pEpSet); stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, @@ -565,6 +589,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS break; } } + + return code; } void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo) { @@ -585,9 +611,13 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { - char buf[512] = {0}; - (void)epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. +int32_t streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated) { + char buf[512] = {0}; + int32_t code = epsetToStr(pEpSet, buf, tListLen(buf)); // ignore the error since only for log files. + if (code != 0) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; @@ -605,8 +635,13 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet); if (!isEqual) { *pUpdated = true; + char tmp[512] = {0}; - (void)epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); + code = epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); + if (code != 0) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } epsetAssign(&pVgInfo->epSet, pEpSet); stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, @@ -626,7 +661,11 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE *pUpdated = true; char tmp[512] = {0}; - (void)epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); + code = epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); + if (code != 0) { // print error and continue + stError("%s failed to convert epset to str, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } epsetAssign(&pDispatcher->epSet, pEpSet); stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, @@ -637,6 +676,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE } } } + + return code; } int32_t streamTaskStop(SStreamTask* pTask) { @@ -977,7 +1018,10 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { } void streamTaskPause(SStreamTask* pTask) { - (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); + int32_t code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); + if (code) { + stError("s-task:%s failed handle pause event async, code:%s", pTask->id.idStr, tstrerror(code)); + } } void streamTaskResume(SStreamTask* pTask) { @@ -1182,13 +1226,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr; if (pTriggerTmr->tmrHandle != NULL) { - (void)taosTmrStop(pTriggerTmr->tmrHandle); + streamTmrStop(pTriggerTmr->tmrHandle); pTriggerTmr->tmrHandle = NULL; } SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr; if (pReadyTmr->tmrHandle != NULL) { - (void)taosTmrStop(pReadyTmr->tmrHandle); + streamTmrStop(pReadyTmr->tmrHandle); pReadyTmr->tmrHandle = NULL; } diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index b6275c0eb1..5e12f51c9d 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -42,13 +42,16 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) { void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, const char* pMsg) { -// while (1) { - bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); - if (ret) { -// break; - } -// stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg); -// } + bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); + if (ret) { + } +} + +void streamTmrStop(tmr_h tmrId) { + bool stop = taosTmrStop(tmrId); + if (stop) { + // todo + } } int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {