fix(stream): remove the related checkpoint trans when trying to drop stream.

This commit is contained in:
Haojun Liao 2024-01-03 14:44:04 +08:00
parent 453d3e8a30
commit 284ef7d085
3 changed files with 57 additions and 20 deletions

View File

@ -71,7 +71,8 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool streamTransConflictOtherTrans(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
int32_t mndStreamGetRelCheckpointTrans(SMnode *pMnode, int64_t streamUid);
// for sma
// TODO refactor

View File

@ -80,7 +80,9 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static void killCheckpointTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void *);
@ -1047,7 +1049,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, true);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, true);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
@ -1344,6 +1346,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
if (ps == NULL) {
continue;
}
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId);
@ -1399,7 +1402,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq);
@ -1451,6 +1454,12 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
// kill the related checkpoint trans
int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid);
if (transId != 0) {
killCheckpointTransImpl(pMnode, transId, pStream->sourceDb);
}
removeStreamTasksInBuf(pStream, &execInfo);
SName name = {0};
@ -1894,7 +1903,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
@ -2037,7 +2046,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
@ -2343,7 +2352,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
break;
}
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_UPDATE_NAME, false);
sdbRelease(pSdb, pStream);
if (conflict) {
@ -2567,7 +2576,7 @@ static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInf
size_t len = 0;
void * pKey = taosHashGetKey(pDb, &len);
killActiveCheckpointTrans(pMnode, pKey, len);
doKillCheckpointTrans(pMnode, pKey, len);
}
}
@ -2812,7 +2821,16 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
void killCheckpointTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
@ -2825,16 +2843,10 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t le
return TSDB_CODE_SUCCESS;
}
STrans *pTrans = mndAcquireTrans(pMnode, pTransInfo->transId);
if (pTrans != NULL) {
char* pDupDBName = strndup(pDBName, len);
mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDupDBName);
killCheckpointTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
return TSDB_CODE_SUCCESS;
}
@ -2855,7 +2867,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
} else {
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) {
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetSTbName);

View File

@ -65,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
return 0;
}
bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) {
bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) {
if (lock) {
taosThreadMutexLock(&execInfo.lock);
}
@ -113,6 +113,30 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char
return false;
}
int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) {
taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}
clearFinishedTrans(pMnode);
SStreamTransInfo* pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
taosThreadMutexUnlock(&execInfo.lock);
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
return tInfo.transId;
}
} else {
taosThreadMutexUnlock(&execInfo.lock);
}
return 0;
}
int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) {
SCheckpointCandEntry* pEntry = taosHashGet(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid));
if (pEntry == NULL) {