refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-11 11:59:29 +08:00
parent 58694c67dd
commit 5f7ce21530
3 changed files with 13 additions and 14 deletions

View File

@ -72,7 +72,7 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
int32_t mndStreamGetRelCheckpointTrans(SMnode *pMnode, int64_t streamUid);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
// for sma
// TODO refactor

View File

@ -81,7 +81,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static void killCheckpointTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void *);
@ -95,9 +95,6 @@ static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream);
static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STREAM,
@ -1455,9 +1452,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
}
// kill the related checkpoint trans
int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid);
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
if (transId != 0) {
killCheckpointTransImpl(pMnode, transId, pStream->sourceDb);
mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
killTransImpl(pMnode, transId, pStream->sourceDb);
}
removeStreamTasksInBuf(pStream, &execInfo);
@ -1502,9 +1500,10 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
#endif
// kill the related checkpoint trans
int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid);
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
if (transId != 0) {
killCheckpointTransImpl(pMnode, transId, pStream->sourceDb);
mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
killTransImpl(pMnode, transId, pStream->sourceDb);
}
// drop the stream obj in execInfo
@ -2836,10 +2835,10 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
void killCheckpointTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) {
void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d in Db:%s", transId, pDbName);
mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
@ -2859,7 +2858,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
}
char* pDupDBName = strndup(pDBName, len);
killCheckpointTransImpl(pMnode, pTransInfo->transId, pDupDBName);
killTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName);
return TSDB_CODE_SUCCESS;

View File

@ -113,7 +113,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
return false;
}
int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) {
int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
taosThreadMutexLock(&execInfo.lock);
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) {
@ -127,7 +127,7 @@ int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) {
SStreamTransInfo tInfo = *pEntry;
taosThreadMutexUnlock(&execInfo.lock);
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
return tInfo.transId;
}
} else {