Merge pull request #14441 from taosdata/fix/tsim

fix: drop tsma
This commit is contained in:
Shengliang Guan 2022-07-01 20:26:13 +08:00 committed by GitHub
commit 7189be44eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 30 additions and 33 deletions

View File

@ -47,6 +47,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, S
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
#ifdef __cplusplus
}

View File

@ -645,7 +645,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pNew->uid) {
if (mndVgroupInDb(pVgroup, pNew->uid)) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
@ -1006,7 +1006,7 @@ static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
if (mndVgroupInDb(pVgroup, pDb->uid)) {
numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
vindex++;
}

View File

@ -225,10 +225,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) {
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
sdbRelease(pSdb, pVgroup);
@ -420,10 +421,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->sourceDbUid) {
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -483,10 +485,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->sourceDbUid) {
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
sdbRelease(pSdb, pVgroup);
@ -559,7 +562,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) {
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}

View File

@ -822,6 +822,17 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
if (pSma->stbUid == pStb->uid) {
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
if (pStream != NULL && pStream->smaId == pSma->uid) {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
goto _OVER;
}
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
goto _OVER;
}
}
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;

View File

@ -621,12 +621,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
@ -664,12 +659,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
@ -1297,12 +1287,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
@ -1688,12 +1673,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) {
sdbRelease(pSdb, pVgroup);
continue;
}
if (pVgroup->isTsma) {
if (!mndVgroupInDb(pVgroup, pDb->uid)) {
sdbRelease(pSdb, pVgroup);
continue;
}

View File

@ -1146,13 +1146,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} else {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
}
if (pAction->rawWritten) {
} else if (pAction->rawWritten) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
} else {
mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
}
} else {
}
}

View File

@ -1759,4 +1759,6 @@ _OVER:
taosArrayDestroy(pArray);
return code;
}
}
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }