fix(stream): set correct name.

This commit is contained in:
Haojun Liao 2024-07-28 12:14:39 +08:00
parent bf3b2e8dde
commit 5096e25032
2 changed files with 16 additions and 6 deletions

View File

@ -806,14 +806,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// schedule stream task for stream obj
if ((code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList)) < 0) {
code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to schedule since %s", createReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
}
// add stream to trans
if ((code = mndPersistStream(pTrans, &streamObj)) < 0) {
code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to persist since %s", createReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
@ -837,7 +839,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
streamMutexUnlock(&execInfo.lock);
// execute creation
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
goto _OVER;
@ -848,12 +851,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SName dbname = {0};
code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (code) {
mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
goto _OVER;
}
SName name = {0};
code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
if (code) {
mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
goto _OVER;
}
@ -868,7 +873,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
_OVER:
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
mError("stream:%s, failed to create since %s", createReq.name, terrstr(code));
} else {
mDebug("stream:%s create stream completed", createReq.name);
}
mndReleaseStream(pMnode, pStream);

View File

@ -1001,7 +1001,9 @@ static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans == NULL) return -1;
if (pTrans == NULL) {
return TSDB_CODE_INVALID_PARA;
}
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
@ -1583,6 +1585,7 @@ _OVER:
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true;
int32_t code = 0;
terrno = 0;
if (pTrans->exec == TRN_EXEC_SERIAL) {
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf);