refactor: refactor the stream trans conflict check
This commit is contained in:
parent
2b099b11dd
commit
43e6722fd8
|
@ -649,8 +649,7 @@ typedef struct SStreamConf {
|
||||||
} SStreamConf;
|
} SStreamConf;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
// ctl
|
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
|
|
||||||
// create info
|
// create info
|
||||||
|
|
|
@ -22,17 +22,37 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t mndInitStream(SMnode *pMnode);
|
typedef struct SStreamTransInfo {
|
||||||
void mndCleanupStream(SMnode *pMnode);
|
int64_t startTime;
|
||||||
|
int32_t transId;
|
||||||
|
const char *name;
|
||||||
|
} SStreamTransInfo;
|
||||||
|
|
||||||
|
typedef struct SStreamTransMgmt {
|
||||||
|
SHashObj *pDBTrans;
|
||||||
|
} SStreamTransMgmt;
|
||||||
|
|
||||||
|
typedef struct SStreamExecInfo {
|
||||||
|
SArray *pNodeList;
|
||||||
|
int64_t ts; // snapshot ts
|
||||||
|
SStreamTransMgmt transMgmt;
|
||||||
|
int64_t activeCheckpoint; // active check point id
|
||||||
|
SHashObj * pTaskMap;
|
||||||
|
SArray * pTaskList;
|
||||||
|
TdThreadMutex lock;
|
||||||
|
} SStreamExecInfo;
|
||||||
|
|
||||||
|
extern SStreamExecInfo execInfo;
|
||||||
|
|
||||||
|
int32_t mndInitStream(SMnode *pMnode);
|
||||||
|
void mndCleanupStream(SMnode *pMnode);
|
||||||
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
|
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
|
||||||
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||||
|
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
|
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb);
|
||||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb);
|
||||||
|
|
||||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|
||||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
|
||||||
|
|
||||||
// for sma
|
// for sma
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
extern bool tsDeployOnSnode;
|
extern bool tsDeployOnSnode;
|
||||||
|
|
||||||
static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
||||||
SEpSet* pEpset, bool isFillhistory);
|
SEpSet* pEpset, bool isFillhistory);
|
||||||
|
|
||||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||||
int64_t watermark, int64_t deleteMark) {
|
int64_t watermark, int64_t deleteMark) {
|
||||||
|
|
|
@ -34,7 +34,13 @@
|
||||||
#define MND_STREAM_VER_NUMBER 4
|
#define MND_STREAM_VER_NUMBER 4
|
||||||
#define MND_STREAM_RESERVE_SIZE 64
|
#define MND_STREAM_RESERVE_SIZE 64
|
||||||
#define MND_STREAM_MAX_NUM 60
|
#define MND_STREAM_MAX_NUM 60
|
||||||
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
|
||||||
|
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
||||||
|
#define MND_STREAM_PAUSE_NAME "stream-pause"
|
||||||
|
#define MND_STREAM_RESUME_NAME "stream-resume"
|
||||||
|
#define MND_STREAM_DROP_NAME "stream-drop"
|
||||||
|
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
|
||||||
|
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
|
||||||
|
|
||||||
typedef struct SNodeEntry {
|
typedef struct SNodeEntry {
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
|
@ -43,22 +49,13 @@ typedef struct SNodeEntry {
|
||||||
int64_t hbTimestamp; // second
|
int64_t hbTimestamp; // second
|
||||||
} SNodeEntry;
|
} SNodeEntry;
|
||||||
|
|
||||||
typedef struct SStreamExecInfo {
|
|
||||||
SArray *pNodeList;
|
|
||||||
int64_t ts; // snapshot ts
|
|
||||||
int64_t activeCheckpoint; // active check point id
|
|
||||||
SHashObj * pTaskMap;
|
|
||||||
SArray * pTaskList;
|
|
||||||
TdThreadMutex lock;
|
|
||||||
} SStreamExecInfo;
|
|
||||||
|
|
||||||
typedef struct SVgroupChangeInfo {
|
typedef struct SVgroupChangeInfo {
|
||||||
SHashObj *pDBMap;
|
SHashObj *pDBMap;
|
||||||
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
||||||
} SVgroupChangeInfo;
|
} SVgroupChangeInfo;
|
||||||
|
|
||||||
static int32_t mndNodeCheckSentinel = 0;
|
static int32_t mndNodeCheckSentinel = 0;
|
||||||
static SStreamExecInfo execInfo;
|
SStreamExecInfo execInfo;
|
||||||
|
|
||||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
||||||
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
|
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
|
||||||
|
@ -83,17 +80,20 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||||
|
|
||||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||||
|
|
||||||
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
|
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg);
|
||||||
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
|
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
|
||||||
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||||
int32_t retryCode);
|
int32_t retryCode);
|
||||||
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||||
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
|
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName);
|
||||||
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
||||||
|
|
||||||
|
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||||
|
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndInitStream(SMnode *pMnode) {
|
int32_t mndInitStream(SMnode *pMnode) {
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
.sdbType = SDB_STREAM,
|
.sdbType = SDB_STREAM,
|
||||||
|
@ -335,7 +335,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
|
||||||
.pAstRoot = pAst,
|
.pAstRoot = pAst,
|
||||||
.topicQuery = false,
|
.topicQuery = false,
|
||||||
.streamQuery = true,
|
.streamQuery = true,
|
||||||
.triggerType = triggerType == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
|
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
|
||||||
.watermark = watermark,
|
.watermark = watermark,
|
||||||
};
|
};
|
||||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
||||||
|
@ -720,6 +720,52 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool checkDbPrecision(SMnode* pMnode, SStreamObj* pStreamObj) {
|
||||||
|
if (pStreamObj->sourceDbUid != pStreamObj->targetDbUid) {
|
||||||
|
SDbObj *pSrcDb = mndAcquireDb(pMnode, pStreamObj->sourceDb);
|
||||||
|
SDbObj *pDstDb = mndAcquireDb(pMnode, pStreamObj->targetDb);
|
||||||
|
|
||||||
|
bool isIdentical = (pSrcDb->cfg.precision != pDstDb->cfg.precision);
|
||||||
|
mndReleaseDb(pMnode, pSrcDb);
|
||||||
|
mndReleaseDb(pMnode, pDstDb);
|
||||||
|
|
||||||
|
if (!isIdentical) {
|
||||||
|
mError("stream:%s failed to create since target/source db precision not identical", pStreamObj->name);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
|
||||||
|
int32_t numOfStream = 0;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||||
|
if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
|
||||||
|
++numOfStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
|
||||||
|
if (numOfStream > MND_STREAM_MAX_NUM) {
|
||||||
|
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
return TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
||||||
|
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
SMnode * pMnode = pReq->info.node;
|
SMnode * pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -732,6 +778,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
|
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -772,42 +819,15 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
// check if the time precision for source&target DB is identical.
|
||||||
int32_t numOfStream = 0;
|
bool isIdentical = checkDbPrecision(pMnode, &streamObj);
|
||||||
|
if (!isIdentical) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamObj *pStream = NULL;
|
code = checkForNumOfStreams(pMnode, &streamObj);
|
||||||
void * pIter = NULL;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _OVER;
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
|
||||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
|
||||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
|
|
||||||
++numOfStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
|
||||||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
|
||||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
|
||||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pStream->targetStbUid == streamObj.targetStbUid) {
|
|
||||||
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
||||||
|
@ -866,7 +886,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
mDebug("stream tasks register into node list");
|
mDebug("stream tasks register into node list");
|
||||||
keepStreamTasksInBuf(&streamObj, &execInfo);
|
saveStreamTasksInfo(&streamObj, &execInfo);
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
@ -1268,7 +1288,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
||||||
int64_t checkpointId = pMsg->checkpointId;
|
int64_t checkpointId = pMsg->checkpointId;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, MND_STREAM_CHECKPOINT_NAME);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1277,7 +1297,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
|
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
|
||||||
|
|
||||||
const char *pDb = mndGetStreamDB(pMnode);
|
const char *pDb = mndGetStreamDB(pMnode);
|
||||||
mndTransSetDbName(pTrans, pDb, "checkpoint");
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||||
taosMemoryFree((void *)pDb);
|
taosMemoryFree((void *)pDb);
|
||||||
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
@ -1345,7 +1365,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
|
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
|
if (!conflict) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
@ -1353,7 +1380,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
|
mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name);
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
@ -1363,6 +1390,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb);
|
||||||
|
|
||||||
// drop all tasks
|
// drop all tasks
|
||||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
||||||
|
@ -1392,7 +1421,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
// reuse this function for stream
|
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
|
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
|
||||||
|
|
||||||
|
@ -1814,6 +1842,13 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
|
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
|
if (!conflict) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
bool updated = taskNodeIsUpdated(pMnode);
|
bool updated = taskNodeIsUpdated(pMnode);
|
||||||
if (updated) {
|
if (updated) {
|
||||||
mError("tasks are not ready for pause, node update detected");
|
mError("tasks are not ready for pause, node update detected");
|
||||||
|
@ -1822,7 +1857,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
|
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1836,6 +1871,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb);
|
||||||
|
|
||||||
// pause all tasks
|
// pause all tasks
|
||||||
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
|
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
||||||
|
@ -1940,13 +1977,21 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
if (pTrans == NULL) {
|
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());
|
if (!conflict) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
@ -1955,6 +2000,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb);
|
||||||
|
|
||||||
// resume all tasks
|
// resume all tasks
|
||||||
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
||||||
|
@ -2219,7 +2266,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
|
|
||||||
// here create only one trans
|
// here create only one trans
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
pTrans = doCreateTrans(pMnode, pStream, "stream-task-update");
|
pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
@ -2329,7 +2376,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
keepStreamTasksInBuf(pStream, &execInfo);
|
saveStreamTasksInfo(pStream, &execInfo);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2413,6 +2460,14 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) {
|
||||||
|
void* pIter = NULL;
|
||||||
|
while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
|
||||||
|
char* pDb = (char*) pIter;
|
||||||
|
killActiveCheckpointTrans(pMnode, pDb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// this function runs by only one thread, so it is not multi-thread safe
|
// this function runs by only one thread, so it is not multi-thread safe
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -2454,7 +2509,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
||||||
doKillActiveCheckpointTrans(pMnode);
|
killAllCheckpointTrans(pMnode, &changeInfo);
|
||||||
|
|
||||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
||||||
|
|
||||||
// keep the new vnode snapshot
|
// keep the new vnode snapshot
|
||||||
|
@ -2500,7 +2556,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
int32_t level = taosArrayGetSize(pStream->tasks);
|
int32_t level = taosArrayGetSize(pStream->tasks);
|
||||||
|
|
||||||
for (int32_t i = 0; i < level; i++) {
|
for (int32_t i = 0; i < level; i++) {
|
||||||
|
@ -2543,8 +2599,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
|
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
|
||||||
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
|
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
|
||||||
taosArrayRemove(pExecNode->pTaskList, k);
|
taosArrayRemove(pExecNode->pTaskList, k);
|
||||||
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId,
|
|
||||||
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
|
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
|
||||||
|
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2555,15 +2612,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
|
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
|
mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg);
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
@ -2578,7 +2635,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||||
STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -2642,43 +2699,36 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
|
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) {
|
||||||
int32_t transId = 0;
|
// data in the hash table will be removed automatically, no need to remove it here.
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName));
|
||||||
STrans *pTrans = NULL;
|
if (pTransInfo == NULL) {
|
||||||
void * pIter = NULL;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
|
|
||||||
transId = pTrans->id;
|
|
||||||
sdbRelease(pSdb, pTrans);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pTrans);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (transId == 0) {
|
|
||||||
mDebug("failed to find the checkpoint trans, reset not executed");
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndAcquireTrans(pMnode, transId);
|
// not checkpoint trans, ignore
|
||||||
mInfo("kill checkpoint trans:%d", transId);
|
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||||
|
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId);
|
||||||
|
if (pTrans != NULL) {
|
||||||
|
mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName);
|
||||||
|
mndKillTrans(pMnode, pTrans);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
}
|
||||||
|
|
||||||
mndKillTrans(pMnode, pTrans);
|
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
|
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
|
||||||
doKillActiveCheckpointTrans(pMnode);
|
STrans* pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
if (pTrans != NULL) {
|
||||||
|
mInfo("kill checkpoint transId:%d to reset task status", transId);
|
||||||
|
mndKillTrans(pMnode, pTrans);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
}
|
||||||
|
|
||||||
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
|
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
|
@ -2690,7 +2740,13 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo this transaction should exist be only one
|
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
|
if (conflict) {
|
||||||
|
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
|
||||||
|
pStream->name, pStream->sourceDb, pStream->targetDb);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
|
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
|
||||||
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
|
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2817,7 +2873,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||||
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
|
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
|
||||||
execInfo.activeCheckpoint);
|
execInfo.activeCheckpoint);
|
||||||
mndResetFromCheckpoint(pMnode);
|
mndResetStatusFromCheckpoint(pMnode, activeCheckpointId);
|
||||||
} else {
|
} else {
|
||||||
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
|
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue