fix(stream): adjust conflict level when creating stream.

This commit is contained in:
Haojun Liao 2024-03-20 18:31:57 +08:00
parent 65e4048f5d
commit a9ba534f48
4 changed files with 26 additions and 19 deletions

View File

@ -107,7 +107,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode); int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);

View File

@ -513,6 +513,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit_s(sizeof(SField), createReq.numOfColumns); createReq.pColumns = taosArrayInit_s(sizeof(SField), createReq.numOfColumns);
// build fields // build fields
for (int32_t i = 0; i < createReq.numOfColumns; i++) { for (int32_t i = 0; i < createReq.numOfColumns; i++) {
SField *pField = taosArrayGet(createReq.pColumns, i); SField *pField = taosArrayGet(createReq.pColumns, i);
@ -586,12 +587,15 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
mndFreeStb(&stbObj); mndFreeStb(&stbObj);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols);
return 0; return 0;
_OVER: _OVER:
tFreeSMCreateStbReq(&createReq); tFreeSMCreateStbReq(&createReq);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno));
return -1; return -1;
} }
@ -647,7 +651,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_PLATFORM; terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER; goto _OVER;
#endif #endif
mInfo("stream:%s, start to create, sql:%s", createReq.name, createReq.sql); mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
if (mndCheckCreateStreamReq(&createReq) != 0) { if (mndCheckCreateStreamReq(&createReq) != 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr()); mError("stream:%s, failed to create since %s", createReq.name, terrstr());
@ -684,17 +688,20 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
if (pTrans == NULL) { if (pTrans == NULL) {
goto _OVER; goto _OVER;
} }
// create stb for stream // create stb for stream
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
}
} else {
mDebug("stream:%s no need create stable", createReq.name);
} }
// schedule stream task for stream obj // schedule stream task for stream obj
@ -724,7 +731,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// add into buffer firstly // add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s tasks register into node list", createReq.name); mDebug("stream stream:%s start to register tasks into task_node_list", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo); saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
@ -890,7 +897,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1143,7 +1150,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream"); STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1573,7 +1580,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream"); STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1662,7 +1669,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream"); STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1794,7 +1801,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans // here create only one trans
if (pTrans == NULL) { if (pTrans == NULL) {
pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
if (pTrans == NULL) { if (pTrans == NULL) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);

View File

@ -66,7 +66,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
} }
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) { if (pTrans == NULL) {
return terrno; return terrno;
} }
@ -156,7 +156,7 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
} }
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream"); STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", terrstr());
return -1; return -1;

View File

@ -161,8 +161,8 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) { STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;