refactor: 1) add update task nodeep trans for every involved streams. 2) do some internal refactor for conflict check.

This commit is contained in:
Haojun Liao 2024-12-06 18:56:49 +08:00
parent 05bfb61ec0
commit 1d700079d4
2 changed files with 36 additions and 30 deletions

View File

@ -1931,11 +1931,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
sdbCancelFetch(pSdb, pIter);
return terrno = code;
}
code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
if (code) {
mError("failed to register trans, transId:%d, and continue", pTrans->id);
}
}
if (!includeAllNodes) {
@ -1951,6 +1946,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id);
// NOTE: for each stream, we register one trans entry for task update
code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
if (code) {
mError("failed to register trans, transId:%d, and continue", pTrans->id);
}
code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again

View File

@ -35,7 +35,12 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
size_t keyLen = 0;
void *pIter = NULL;
SArray *pList = taosArrayInit(4, sizeof(SKeyInfo));
int32_t num = 0;
int32_t numOfChkpt = 0;
int32_t numOfTaskUpdate = 0;
if (pNumOfActiveChkpt != NULL) {
*pNumOfActiveChkpt = 0;
}
if (pList == NULL) {
return terrno;
@ -50,15 +55,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
void *pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
pEntry->startTime);
mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId,
pEntry->streamId, pEntry->name, pEntry->startTime);
void* p = taosArrayPush(pList, &info);
if (p == NULL) {
return terrno;
}
} else {
if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) {
num++;
numOfChkpt++;
}
mndReleaseTrans(pMnode, pTrans);
}
@ -78,48 +83,34 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt)
}
}
mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size,
taosHashGetSize(execInfo.transMgmt.pDBTrans), num);
mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d, update trans:%d", size,
taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt, numOfTaskUpdate);
taosArrayDestroy(pList);
if (pNumOfActiveChkpt != NULL) {
*pNumOfActiveChkpt = num;
*pNumOfActiveChkpt = numOfChkpt;
}
return 0;
}
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
// For a given stream:
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
// 2. create/drop/reset/update trans are conflict with any other trans.
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
if (lock) {
streamMutexLock(&execInfo.lock);
}
static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) {
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) {
if (lock) {
streamMutexUnlock(&execInfo.lock);
}
return 0;
}
// if any task updates exist, any other stream trans are not allowed to be created
int32_t code = mndStreamClearFinishedTrans(pMnode, NULL);
if (code) {
mError("failed to clear finish trans, code:%s", tstrerror(code));
mError("failed to clear finish trans, code:%s, and continue", tstrerror(code));
}
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
if (lock) {
streamMutexUnlock(&execInfo.lock);
}
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
(strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
@ -141,11 +132,25 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char
mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId);
}
return TSDB_CODE_SUCCESS;
}
// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream.
// For a given stream:
// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans.
// 2. create/drop/reset/update trans are conflict with any other trans.
int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) {
if (lock) {
streamMutexLock(&execInfo.lock);
}
int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName);
if (lock) {
streamMutexUnlock(&execInfo.lock);
}
return 0;
return code;
}
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {