diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 5307ff4b05..5a4caf3348 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -107,7 +107,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode); -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 55317d03af..60b522f6fa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -513,6 +513,7 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre createReq.numOfColumns = pStream->outputSchema.nCols; createReq.numOfTags = 1; // group id createReq.pColumns = taosArrayInit_s(sizeof(SField), createReq.numOfColumns); + // build fields for (int32_t i = 0; i < createReq.numOfColumns; i++) { SField *pField = taosArrayGet(createReq.pColumns, i); @@ -586,12 +587,15 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre mndFreeStb(&stbObj); mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); - + mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->targetSTbName, pStream->outputSchema.nCols); return 0; + _OVER: tFreeSMCreateStbReq(&createReq); mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); + + mDebug("stream:%s failed to create dst stable:%s, code:%s", pStream->name, pStream->targetSTbName, tstrerror(terrno)); return -1; } @@ -647,7 +651,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; #endif - mInfo("stream:%s, start to create, sql:%s", createReq.name, createReq.sql); + mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql); if (mndCheckCreateStreamReq(&createReq) != 0) { mError("stream:%s, failed to create since %s", createReq.name, terrstr()); @@ -684,17 +688,20 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); + STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); if (pTrans == NULL) { goto _OVER; } // create stb for stream - if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && - mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { - mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); - mndTransDrop(pTrans); - goto _OVER; + if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) { + if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { + mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, terrstr()); + mndTransDrop(pTrans); + goto _OVER; + } + } else { + mDebug("stream:%s no need create stable", createReq.name); } // schedule stream task for stream obj @@ -724,7 +731,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // add into buffer firstly // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. taosThreadMutexLock(&execInfo.lock); - mDebug("stream stream:%s tasks register into node list", createReq.name); + mDebug("stream stream:%s start to register tasks into task_node_list", createReq.name); saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); @@ -890,7 +897,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); if (pTrans == NULL) { mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1143,7 +1150,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1573,7 +1580,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1662,7 +1669,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream"); + STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream"); if (pTrans == NULL) { mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1794,7 +1801,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // here create only one trans if (pTrans == NULL) { - pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); + pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); if (pTrans == NULL) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 14f3c533e3..c8f943b931 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -66,7 +66,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI } int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); if (pTrans == NULL) { return terrno; } @@ -156,7 +156,7 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) { } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; - STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream"); + STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { mError("failed to create trans to drop orphan tasks since %s", terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 2241d93465..16b735fbc4 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -161,8 +161,8 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) return TSDB_CODE_SUCCESS; } -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name); +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name); if (pTrans == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY;