diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 75ba51e498..dceb86c963 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -114,7 +114,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); -bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f7b97a2c3c..11834e98ac 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -220,7 +220,7 @@ STREAM_DECODE_OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { char *p = (pStream == NULL) ? "null" : pStream->name; - mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr()); + mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno)); taosMemoryFreeClear(pRow); return NULL; } @@ -282,9 +282,9 @@ int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 || pCreate->targetStbFullName[0] == 0) { - return terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; + return TSDB_CODE_MND_INVALID_STREAM_OPTION; } - return 0; + return TSDB_CODE_SUCCESS; } static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) { @@ -366,8 +366,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); if (pSourceDb == NULL) { - mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr()); code = terrno; + mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code)); goto FAIL; } @@ -378,8 +378,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); if (pTargetDb == NULL) { - mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr()); code = terrno; + mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code)); goto FAIL; } @@ -543,8 +543,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { code = tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - if (code == -1) { - code = TSDB_CODE_INVALID_PARA; + if (code != 0) { mError("failed to encode stream task, code:%s", tstrerror(code)); taosMemoryFree(buf); return code; @@ -616,8 +615,8 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; SMCreateStbReq createReq = {0}; tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); @@ -669,19 +668,19 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre } } - if (mndCheckCreateStbReq(&createReq) != 0) { + if ((code = mndCheckCreateStbReq(&createReq)) != 0) { goto _OVER; } pStb = mndAcquireStb(pMnode, createReq.name); if (pStb != NULL) { - terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; + code = TSDB_CODE_MND_STB_ALREADY_EXIST; goto _OVER; } pDb = mndAcquireDbByStb(pMnode, createReq.name); if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + code = TSDB_CODE_MND_DB_NOT_SELECTED; goto _OVER; } @@ -691,7 +690,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre } if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { - terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; + code = TSDB_CODE_MND_SINGLE_STB_MODE_DB; goto _OVER; } @@ -720,7 +719,8 @@ _OVER: mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); - mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); + mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino, + tstrerror(code)); return code; } @@ -742,16 +742,14 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) { mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM, pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); - terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; - return terrno; + return TSDB_CODE_MND_TOO_MANY_STREAMS; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name, pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); - terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; - return terrno; + return TSDB_CODE_MND_INVALID_TARGET_TABLE; } } @@ -767,13 +765,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { const char *pMsg = "create stream tasks on dnodes"; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + STrans *pTrans = NULL; - terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createReq = {0}; - if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - code = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq); + TSDB_CHECK_CODE(code, lino, _OVER); #ifdef WINDOWS code = TSDB_CODE_MND_INVALID_PLATFORM; @@ -782,7 +778,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql); if ((code = mndCheckCreateStreamReq(&createReq)) != 0) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code)); goto _OVER; } @@ -807,21 +803,20 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { sqlLen = strlen(createReq.sql); sql = taosMemoryMalloc(sqlLen + 1); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); + memset(sql, 0, sqlLen + 1); memcpy(sql, createReq.sql, sqlLen); } // build stream obj from request if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code)); goto _OVER; } - if ((code = doStreamCheck(pMnode, &streamObj)) < 0) { - goto _OVER; - } + code = doStreamCheck(pMnode, &streamObj); + TSDB_CHECK_CODE(code, lino, _OVER); - STrans *pTrans = NULL; code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans); if (pTrans == NULL || code) { goto _OVER; @@ -830,7 +825,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // create stb for stream if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) { if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) { - mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); + mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } @@ -841,7 +836,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // schedule stream task for stream obj code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); + mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } @@ -849,7 +844,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // add stream to trans code = mndPersistStream(pTrans, &streamObj); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to persist since %s", createReq.name, terrstr()); + mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } @@ -874,17 +869,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // execute creation code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } mndTransDrop(pTrans); - if (code == 0) { - code = TSDB_CODE_ACTION_IN_PROGRESS; - } - SName dbname = {0}; code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (code) { @@ -910,9 +901,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { _OVER: if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("stream:%s, failed to create since %s", createReq.name, terrstr()); + mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, terrstr(code)); } else { mDebug("stream:%s create stream completed", createReq.name); + code = TSDB_CODE_ACTION_IN_PROGRESS; } mndReleaseStream(pMnode, pStream); @@ -1058,12 +1050,18 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, int8_t mndTrigger, bool lock) { int32_t code = TSDB_CODE_SUCCESS; + bool conflict = false; int64_t ts = taosGetTimestampMs(); + if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { return code; } - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); + if (code) { + goto _ERR; + } + if (conflict) { mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb, pStream->name, pStream->uid); @@ -1126,7 +1124,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to prepare checkpoint trans since %s", terrstr()); + mError("failed to prepare checkpoint trans since %s", terrstr(code)); + } else { + code = TSDB_CODE_ACTION_IN_PROGRESS; } _ERR: @@ -1400,7 +1400,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); sdbRelease(pSdb, p); - if (code != -1) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { started += 1; if (started >= capacity) { @@ -1438,10 +1438,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { tFreeMDropStreamReq(&dropReq); return 0; } else { - code = TSDB_CODE_MND_STREAM_NOT_EXIST; mError("stream:%s not exist failed to drop it", dropReq.name); tFreeMDropStreamReq(&dropReq); - TAOS_RETURN(code); + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } @@ -1480,17 +1479,17 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true); + if (code) { sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); - return terrno; + return code; } STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr()); + mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); TAOS_RETURN(code); @@ -1526,7 +1525,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); @@ -1756,16 +1755,16 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return 0; } - if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) { sdbRelease(pMnode->pSdb, pStream); - return -1; + return code; } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); + if (code) { sdbRelease(pMnode->pSdb, pStream); - TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); + TAOS_RETURN(code); } bool updated = taskNodeIsUpdated(pMnode); @@ -1821,7 +1820,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); + mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } @@ -1836,7 +1835,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // if nodeUpdate happened, not send pause trans code = mndStreamSetPauseAction(pMnode, pTrans, pStream); if (code) { - mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); + mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1857,7 +1856,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1874,8 +1873,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; - if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { - return terrno; + if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { + return code; } SMResumeStreamReq resumeReq = {0}; @@ -1906,17 +1905,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true); + if (code) { sdbRelease(pMnode->pSdb, pStream); - return terrno; + return code; } STrans *pTrans = NULL; code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); + mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } @@ -1929,8 +1928,9 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // set the resume action - if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { - mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); + code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated); + if (code) { + mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -1950,7 +1950,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { taosWUnLockLatch(&pStream->lock); code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -2049,13 +2049,13 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange break; } - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false); + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false); sdbRelease(pSdb, pStream); - if (conflict) { - mError("nodeUpdate conflict with other trans, current nodeUpdate ignored"); + if (code) { + mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code)); sdbCancelFetch(pSdb, pIter); - return terrno; + return code; } } @@ -2119,7 +2119,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -2640,9 +2640,8 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; streamMutexUnlock(&execInfo.lock); - return terrno; + return TSDB_CODE_MND_STREAM_NOT_EXIST; } else { mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", req.streamId, req.taskId); @@ -2993,7 +2992,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; @@ -3038,9 +3037,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); - if (conflict) { - code = TSDB_CODE_MND_TRANS_CONFLICT; + code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); + if (code) { goto _err; } @@ -3048,7 +3046,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); if (pTrans == NULL || code != 0) { - mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mError("failed to create trans to drop orphan tasks since %s", tstrerror(code)); goto _err; } @@ -3059,7 +3057,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { // drop all tasks if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) { - mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mError("failed to create trans to drop orphan tasks since %s", tstrerror(code)); goto _err; } @@ -3070,7 +3068,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code)); goto _err; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index f515e9565d..2a8b93e071 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -109,6 +109,9 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } return code; } @@ -221,8 +224,8 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); } else { - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); - if (conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); + if (code) { mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, pStream->sourceDb, pStream->targetSTbName); } else { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 494771e65e..8a869a6b23 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -91,7 +91,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) // For a given stream: // 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. // 2. create/drop/reset/update trans are conflict with any other trans. -bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { if (lock) { streamMutexLock(&execInfo.lock); } @@ -101,7 +101,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p if (lock) { streamMutexUnlock(&execInfo.lock); } - return false; + return 0; } int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); @@ -121,8 +121,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); - terrno = TSDB_CODE_MND_TRANS_CONFLICT; - return true; + return TSDB_CODE_MND_TRANS_CONFLICT; } else { mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName); } @@ -131,8 +130,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); - terrno = TSDB_CODE_MND_TRANS_CONFLICT; - return true; + return TSDB_CODE_MND_TRANS_CONFLICT; } } else { mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); @@ -142,7 +140,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p streamMutexUnlock(&execInfo.lock); } - return false; + return 0; } int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { @@ -202,47 +200,48 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { int32_t code = 0; int32_t lino = 0; - terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; + void *buf = NULL; SEncoder encoder; tEncoderInit(&encoder, NULL, 0); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { + if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) { tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; + TSDB_CHECK_CODE(code, lino, _over); } + int32_t tlen = encoder.pos; tEncoderClear(&encoder); int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); - if (pRaw == NULL) goto STREAM_ENCODE_OVER; + TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno); buf = taosMemoryMalloc(tlen); - if (buf == NULL) goto STREAM_ENCODE_OVER; + TSDB_CHECK_NULL(buf, code, lino, _over, terrno); tEncoderInit(&encoder, buf, tlen); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { + if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) { tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; + TSDB_CHECK_CODE(code, lino, _over); } + tEncoderClear(&encoder); int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); - SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, tlen, _over); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over); + SDB_SET_DATALEN(pRaw, dataPos, _over); - terrno = TSDB_CODE_SUCCESS; - -STREAM_ENCODE_OVER: +_over: taosMemoryFreeClear(buf); - if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); + if (code != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code)); sdbFreeRaw(pRaw); + terrno = code; return NULL; } + terrno = 0; mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream, pStream->checkpointId); return pRaw; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index abe87fed64..a405cdde0a 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1105,8 +1105,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", pStream->uid, pStream->name, total); - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); - if (!conflict) { + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); + if (code == 0) { code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry taosArrayClear(px->pTaskList);