fix(stream):set the code to be in_progress and do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-09 11:12:30 +08:00
parent 958b443fc2
commit 979c6e20e6
5 changed files with 107 additions and 107 deletions

View File

@ -114,7 +114,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt); int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);

View File

@ -220,7 +220,7 @@ STREAM_DECODE_OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
char *p = (pStream == NULL) ? "null" : pStream->name; char *p = (pStream == NULL) ? "null" : pStream->name;
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr()); mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, tstrerror(terrno));
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
return NULL; return NULL;
} }
@ -282,9 +282,9 @@ int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 || if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
pCreate->targetStbFullName[0] == 0) { pCreate->targetStbFullName[0] == 0) {
return terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; return TSDB_CODE_MND_INVALID_STREAM_OPTION;
} }
return 0; return TSDB_CODE_SUCCESS;
} }
static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) { static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrapper) {
@ -366,8 +366,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
if (pSourceDb == NULL) { if (pSourceDb == NULL) {
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
code = terrno; code = terrno;
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, tstrerror(code));
goto FAIL; goto FAIL;
} }
@ -378,8 +378,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName); SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
if (pTargetDb == NULL) { if (pTargetDb == NULL) {
mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
code = terrno; code = terrno;
mError("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, tstrerror(code));
goto FAIL; goto FAIL;
} }
@ -543,8 +543,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
code = tEncodeStreamTask(&encoder, pTask); code = tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder); tEncoderClear(&encoder);
if (code == -1) { if (code != 0) {
code = TSDB_CODE_INVALID_PARA;
mError("failed to encode stream task, code:%s", tstrerror(code)); mError("failed to encode stream task, code:%s", tstrerror(code));
taosMemoryFree(buf); taosMemoryFree(buf);
return code; return code;
@ -616,8 +615,8 @@ int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
@ -669,19 +668,19 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
} }
} }
if (mndCheckCreateStbReq(&createReq) != 0) { if ((code = mndCheckCreateStbReq(&createReq)) != 0) {
goto _OVER; goto _OVER;
} }
pStb = mndAcquireStb(pMnode, createReq.name); pStb = mndAcquireStb(pMnode, createReq.name);
if (pStb != NULL) { if (pStb != NULL) {
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; code = TSDB_CODE_MND_STB_ALREADY_EXIST;
goto _OVER; goto _OVER;
} }
pDb = mndAcquireDbByStb(pMnode, createReq.name); pDb = mndAcquireDbByStb(pMnode, createReq.name);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; code = TSDB_CODE_MND_DB_NOT_SELECTED;
goto _OVER; goto _OVER;
} }
@ -691,7 +690,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
} }
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; code = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
goto _OVER; goto _OVER;
} }
@ -720,7 +719,8 @@ _OVER:
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->targetSTbName, lino,
tstrerror(code));
return code; return code;
} }
@ -742,16 +742,14 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM, mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
pStreamObj->name); pStreamObj->name);
sdbCancelFetch(pMnode->pSdb, pIter); sdbCancelFetch(pMnode->pSdb, pIter);
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; return TSDB_CODE_MND_TOO_MANY_STREAMS;
return terrno;
} }
if (pStream->targetStbUid == pStreamObj->targetStbUid) { if (pStream->targetStbUid == pStreamObj->targetStbUid) {
mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name, mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
pStreamObj->name); pStreamObj->name);
sdbCancelFetch(pMnode->pSdb, pIter); sdbCancelFetch(pMnode->pSdb, pIter);
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; return TSDB_CODE_MND_INVALID_TARGET_TABLE;
return terrno;
} }
} }
@ -767,13 +765,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
const char *pMsg = "create stream tasks on dnodes"; const char *pMsg = "create stream tasks on dnodes";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
STrans *pTrans = NULL;
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createReq = {0}; SCMCreateStreamReq createReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq) != 0) { code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createReq);
code = TSDB_CODE_INVALID_MSG; TSDB_CHECK_CODE(code, lino, _OVER);
goto _OVER;
}
#ifdef WINDOWS #ifdef WINDOWS
code = TSDB_CODE_MND_INVALID_PLATFORM; code = TSDB_CODE_MND_INVALID_PLATFORM;
@ -782,7 +778,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql); mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
if ((code = mndCheckCreateStreamReq(&createReq)) != 0) { if ((code = mndCheckCreateStreamReq(&createReq)) != 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr()); mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
goto _OVER; goto _OVER;
} }
@ -807,21 +803,20 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
sqlLen = strlen(createReq.sql); sqlLen = strlen(createReq.sql);
sql = taosMemoryMalloc(sqlLen + 1); sql = taosMemoryMalloc(sqlLen + 1);
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
memset(sql, 0, sqlLen + 1); memset(sql, 0, sqlLen + 1);
memcpy(sql, createReq.sql, sqlLen); memcpy(sql, createReq.sql, sqlLen);
} }
// build stream obj from request // build stream obj from request
if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) { if ((code = mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createReq)) < 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr()); mError("stream:%s, failed to create since %s", createReq.name, tstrerror(code));
goto _OVER; goto _OVER;
} }
if ((code = doStreamCheck(pMnode, &streamObj)) < 0) { code = doStreamCheck(pMnode, &streamObj);
goto _OVER; TSDB_CHECK_CODE(code, lino, _OVER);
}
STrans *pTrans = NULL;
code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans); code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
goto _OVER; goto _OVER;
@ -830,7 +825,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// create stb for stream // create stb for stream
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) { if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) { if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
} }
@ -841,7 +836,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// schedule stream task for stream obj // schedule stream task for stream obj
code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList); code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
} }
@ -849,7 +844,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// add stream to trans // add stream to trans
code = mndPersistStream(pTrans, &streamObj); code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to persist since %s", createReq.name, terrstr()); mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
} }
@ -874,17 +869,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// execute creation // execute creation
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
SName dbname = {0}; SName dbname = {0};
code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (code) { if (code) {
@ -910,9 +901,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
_OVER: _OVER:
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr()); mError("stream:%s, failed to create at line:%d since %s", createReq.name, lino, terrstr(code));
} else { } else {
mDebug("stream:%s create stream completed", createReq.name); mDebug("stream:%s create stream completed", createReq.name);
code = TSDB_CODE_ACTION_IN_PROGRESS;
} }
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
@ -1058,12 +1050,18 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) { int8_t mndTrigger, bool lock) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool conflict = false;
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
return code; return code;
} }
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (code) {
goto _ERR;
}
if (conflict) { if (conflict) {
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb, mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
pStream->name, pStream->uid); pStream->name, pStream->uid);
@ -1126,7 +1124,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to prepare checkpoint trans since %s", terrstr()); mError("failed to prepare checkpoint trans since %s", terrstr(code));
} else {
code = TSDB_CODE_ACTION_IN_PROGRESS;
} }
_ERR: _ERR:
@ -1400,7 +1400,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
sdbRelease(pSdb, p); sdbRelease(pSdb, p);
if (code != -1) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
started += 1; started += 1;
if (started >= capacity) { if (started >= capacity) {
@ -1438,10 +1438,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
return 0; return 0;
} else { } else {
code = TSDB_CODE_MND_STREAM_NOT_EXIST;
mError("stream:%s not exist failed to drop it", dropReq.name); mError("stream:%s not exist failed to drop it", dropReq.name);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
TAOS_RETURN(code); TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
} }
} }
@ -1480,17 +1479,17 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
if (conflict) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
return terrno; return code;
} }
STrans *pTrans = NULL; STrans *pTrans = NULL;
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr()); mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
TAOS_RETURN(code); TAOS_RETURN(code);
@ -1526,7 +1525,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
@ -1756,16 +1755,16 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return 0; return 0;
} }
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return code;
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
if (conflict) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); TAOS_RETURN(code);
} }
bool updated = taskNodeIsUpdated(pMnode); bool updated = taskNodeIsUpdated(pMnode);
@ -1821,7 +1820,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
STrans *pTrans = NULL; STrans *pTrans = NULL;
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans); code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return code; return code;
} }
@ -1836,7 +1835,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
// if nodeUpdate happened, not send pause trans // if nodeUpdate happened, not send pause trans
code = mndStreamSetPauseAction(pMnode, pTrans, pStream); code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
if (code) { if (code) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); mError("stream:%s, failed to pause task since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -1857,7 +1856,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -1874,8 +1873,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
int32_t code = 0; int32_t code = 0;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return terrno; return code;
} }
SMResumeStreamReq resumeReq = {0}; SMResumeStreamReq resumeReq = {0};
@ -1906,17 +1905,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
if (conflict) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return terrno; return code;
} }
STrans *pTrans = NULL; STrans *pTrans = NULL;
code = code =
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans); doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); mError("stream:%s, failed to resume stream since %s", resumeReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return code; return code;
} }
@ -1929,8 +1928,9 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
// set the resume action // set the resume action
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { code = mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated);
mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); if (code) {
mError("stream:%s, failed to drop task since %s", resumeReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -1950,7 +1950,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -2049,13 +2049,13 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
break; break;
} }
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
if (conflict) { if (code) {
mError("nodeUpdate conflict with other trans, current nodeUpdate ignored"); mError("nodeUpdate conflict with other trans, current nodeUpdate ignored, code:%s", tstrerror(code));
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return terrno; return code;
} }
} }
@ -2119,7 +2119,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -2640,9 +2640,8 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return terrno; return TSDB_CODE_MND_STREAM_NOT_EXIST;
} else { } else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId); req.streamId, req.taskId);
@ -2993,7 +2992,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream,
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -3038,9 +3037,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); code = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
if (conflict) { if (code) {
code = TSDB_CODE_MND_TRANS_CONFLICT;
goto _err; goto _err;
} }
@ -3048,7 +3046,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
if (pTrans == NULL || code != 0) { if (pTrans == NULL || code != 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
goto _err; goto _err;
} }
@ -3059,7 +3057,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
// drop all tasks // drop all tasks
if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) { if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", tstrerror(code));
goto _err; goto _err;
} }
@ -3070,7 +3068,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code));
goto _err; goto _err;
} }

View File

@ -109,6 +109,9 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
return code; return code;
} }
@ -221,8 +224,8 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST; code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
} else { } else {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) { if (code) {
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetSTbName); pStream->sourceDb, pStream->targetSTbName);
} else { } else {

View File

@ -91,7 +91,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
// For a given stream: // For a given stream:
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. // 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
// 2. create/drop/reset/update trans are conflict with any other trans. // 2. create/drop/reset/update trans are conflict with any other trans.
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
if (lock) { if (lock) {
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
} }
@ -101,7 +101,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
if (lock) { if (lock) {
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
} }
return false; return 0;
} }
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
@ -121,8 +121,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name); tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT; return TSDB_CODE_MND_TRANS_CONFLICT;
return true;
} else { } else {
mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName); mDebug("not conflict with checkpoint trans, name:%s, continue creating trans", pTransName);
} }
@ -131,8 +130,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name); tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT; return TSDB_CODE_MND_TRANS_CONFLICT;
return true;
} }
} else { } else {
mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
@ -142,7 +140,7 @@ bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *p
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
} }
return false; return 0;
} }
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
@ -202,47 +200,48 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnCo
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
terrno = TSDB_CODE_OUT_OF_MEMORY; void *buf = NULL;
void *buf = NULL;
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, NULL, 0); tEncoderInit(&encoder, NULL, 0);
if (tEncodeSStreamObj(&encoder, pStream) < 0) { if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER; TSDB_CHECK_CODE(code, lino, _over);
} }
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tEncoderClear(&encoder); tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
if (pRaw == NULL) goto STREAM_ENCODE_OVER; TSDB_CHECK_NULL(pRaw, code, lino, _over, terrno);
buf = taosMemoryMalloc(tlen); buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER; TSDB_CHECK_NULL(buf, code, lino, _over, terrno);
tEncoderInit(&encoder, buf, tlen); tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) { if ((code = tEncodeSStreamObj(&encoder, pStream)) < 0) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER; TSDB_CHECK_CODE(code, lino, _over);
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, tlen, _over);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _over);
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, _over);
terrno = TSDB_CODE_SUCCESS; _over:
STREAM_ENCODE_OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); mError("stream:%s, failed to encode to raw:%p at line:%d since %s", pStream->name, pRaw, lino, tstrerror(code));
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
terrno = code;
return NULL; return NULL;
} }
terrno = 0;
mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream, mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
pStream->checkpointId); pStream->checkpointId);
return pRaw; return pRaw;

View File

@ -1105,8 +1105,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
pStream->uid, pStream->name, total); pStream->uid, pStream->name, total);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
if (!conflict) { if (code == 0) {
code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList); code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosArrayClear(px->pTaskList); taosArrayClear(px->pTaskList);