fix tsma test
This commit is contained in:
parent
bfeb90e19f
commit
b9bb3fd1de
|
@ -1892,13 +1892,21 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
|||
rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||
if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES);
|
||||
if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) {
|
||||
rsp.dbTsmaVersion = pDb->tsmaVersion;
|
||||
bool exist = false;
|
||||
if (mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist) != 0) {
|
||||
int32_t code = mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mError("db:%s, failed to get db tsmas", pDb->name);
|
||||
if (code != TSDB_CODE_NEED_RETRY) {
|
||||
mError("db:%s, failed to get db tsmas", pDb->name);
|
||||
} else {
|
||||
mWarn("db:%s, need retry to get db tsmas", pDb->name);
|
||||
}
|
||||
taosArrayDestroyP(rsp.pTsmaRsp->pTsmas, tFreeAndClearTableTSMAInfo);
|
||||
taosMemoryFreeClear(rsp.pTsmaRsp);
|
||||
continue;
|
||||
}
|
||||
rsp.dbTsmaVersion = pDb->tsmaVersion;
|
||||
mDebug("update tsma version to %d, got tsma num: %ld", pDb->tsmaVersion, rsp.pTsmaRsp->pTsmas->size);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1909,6 +1917,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
|||
if (rsp.useDbRsp->pVgroupInfos == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mError("db:%s, failed to malloc usedb response", pDb->name);
|
||||
taosArrayDestroyP(rsp.pTsmaRsp->pTsmas, tFreeAndClearTableTSMAInfo);
|
||||
taosMemoryFreeClear(rsp.pTsmaRsp);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -2030,12 +2030,12 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
|
|||
SDbObj newDb = {0};
|
||||
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
||||
newDb.tsmaVersion++;
|
||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
_OVER:
|
||||
|
@ -2450,13 +2450,14 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
|
|||
typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param);
|
||||
|
||||
static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) {
|
||||
int32_t code = -1;
|
||||
int32_t code = 0;
|
||||
SSmaObj * pSma = NULL;
|
||||
SSmaObj * pBaseTsma = NULL;
|
||||
SSdb * pSdb = pMnode->pSdb;
|
||||
void * pIter = NULL;
|
||||
SStreamObj * pStream = NULL;
|
||||
SStbObj * pStb = NULL;
|
||||
bool shouldRetry = false;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
|
@ -2470,6 +2471,7 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
|||
pStb = mndAcquireStb(pMnode, pSma->dstTbName);
|
||||
if (!pStb) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
shouldRetry = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2478,16 +2480,24 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
|||
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
|
||||
pStream = NULL;
|
||||
|
||||
code = mndAcquireStream(pMnode, streamName, &pStream);
|
||||
if (!pStream || (code != 0)) {
|
||||
if (!pStream) {
|
||||
shouldRetry = true;
|
||||
sdbRelease(pSdb, pSma);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
continue;
|
||||
}
|
||||
if (code != 0) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int64_t streamId = pStream->uid;
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
|
@ -2522,6 +2532,9 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
|||
}
|
||||
*exist = true;
|
||||
}
|
||||
if (shouldRetry) {
|
||||
return TSDB_CODE_NEED_RETRY;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2562,6 +2575,9 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
|
|||
code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
||||
} else {
|
||||
code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
||||
if (TSDB_CODE_NEED_RETRY == code) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
if (code) {
|
||||
goto _OVER;
|
||||
|
|
|
@ -616,6 +616,7 @@ class TDTestCase:
|
|||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
tdSql.execute('alter local "debugFlag" "143"')
|
||||
tdSql.execute('alter dnode 1 "debugFlag" "143"')
|
||||
self.tsma_tester: TSMATester = TSMATester(tdSql)
|
||||
self.tsma_sql_generator: TSMATestSQLGenerator = TSMATestSQLGenerator()
|
||||
|
||||
|
|
Loading…
Reference in New Issue