commit
f77223d2a4
|
@ -1892,13 +1892,21 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
||||||
rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||||
if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES);
|
if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES);
|
||||||
if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) {
|
if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) {
|
||||||
rsp.dbTsmaVersion = pDb->tsmaVersion;
|
|
||||||
bool exist = false;
|
bool exist = false;
|
||||||
if (mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist) != 0) {
|
int32_t code = mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
if (code != TSDB_CODE_NEED_RETRY) {
|
||||||
mError("db:%s, failed to get db tsmas", pDb->name);
|
mError("db:%s, failed to get db tsmas", pDb->name);
|
||||||
|
} else {
|
||||||
|
mWarn("db:%s, need retry to get db tsmas", pDb->name);
|
||||||
|
}
|
||||||
|
taosArrayDestroyP(rsp.pTsmaRsp->pTsmas, tFreeAndClearTableTSMAInfo);
|
||||||
|
taosMemoryFreeClear(rsp.pTsmaRsp);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
rsp.dbTsmaVersion = pDb->tsmaVersion;
|
||||||
|
mDebug("update tsma version to %d, got tsma num: %ld", pDb->tsmaVersion, rsp.pTsmaRsp->pTsmas->size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1909,6 +1917,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
||||||
if (rsp.useDbRsp->pVgroupInfos == NULL) {
|
if (rsp.useDbRsp->pVgroupInfos == NULL) {
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mError("db:%s, failed to malloc usedb response", pDb->name);
|
mError("db:%s, failed to malloc usedb response", pDb->name);
|
||||||
|
taosArrayDestroyP(rsp.pTsmaRsp->pTsmas, tFreeAndClearTableTSMAInfo);
|
||||||
|
taosMemoryFreeClear(rsp.pTsmaRsp);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1760,14 +1760,14 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
||||||
SDbObj newDb = {0};
|
SDbObj newDb = {0};
|
||||||
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
||||||
newDb.tsmaVersion++;
|
newDb.tsmaVersion++;
|
||||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
|
||||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
|
||||||
TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &createStreamRedoAction), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &createStreamUndoAction), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransAppendUndoAction(pTrans, &dropStbUndoAction), NULL, _OVER);
|
||||||
|
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||||
|
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
|
||||||
|
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2030,12 +2030,12 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
|
||||||
SDbObj newDb = {0};
|
SDbObj newDb = {0};
|
||||||
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
||||||
newDb.tsmaVersion++;
|
newDb.tsmaVersion++;
|
||||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
|
||||||
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
|
||||||
TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStreamRedoAction), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransAppendRedoAction(pTrans, &dropStbRedoAction), NULL, _OVER);
|
||||||
|
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||||
|
TAOS_CHECK_GOTO(mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndTransPrepare(pCxt->pMnode, pTrans), NULL, _OVER);
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -2450,13 +2450,14 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
|
||||||
typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param);
|
typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param);
|
||||||
|
|
||||||
static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) {
|
static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) {
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
SSmaObj * pSma = NULL;
|
SSmaObj * pSma = NULL;
|
||||||
SSmaObj * pBaseTsma = NULL;
|
SSmaObj * pBaseTsma = NULL;
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
void * pIter = NULL;
|
void * pIter = NULL;
|
||||||
SStreamObj * pStream = NULL;
|
SStreamObj * pStream = NULL;
|
||||||
SStbObj * pStb = NULL;
|
SStbObj * pStb = NULL;
|
||||||
|
bool shouldRetry = false;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
|
@ -2470,6 +2471,7 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
||||||
pStb = mndAcquireStb(pMnode, pSma->dstTbName);
|
pStb = mndAcquireStb(pMnode, pSma->dstTbName);
|
||||||
if (!pStb) {
|
if (!pStb) {
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
|
shouldRetry = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2478,16 +2480,24 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
||||||
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
code = tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
|
mndReleaseStb(pMnode, pStb);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
|
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
|
||||||
pStream = NULL;
|
pStream = NULL;
|
||||||
|
|
||||||
code = mndAcquireStream(pMnode, streamName, &pStream);
|
code = mndAcquireStream(pMnode, streamName, &pStream);
|
||||||
if (!pStream || (code != 0)) {
|
if (!pStream) {
|
||||||
|
shouldRetry = true;
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
|
mndReleaseStb(pMnode, pStb);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (code != 0) {
|
||||||
|
sdbRelease(pSdb, pSma);
|
||||||
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t streamId = pStream->uid;
|
int64_t streamId = pStream->uid;
|
||||||
mndReleaseStream(pMnode, pStream);
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
@ -2522,6 +2532,9 @@ static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilt
|
||||||
}
|
}
|
||||||
*exist = true;
|
*exist = true;
|
||||||
}
|
}
|
||||||
|
if (shouldRetry) {
|
||||||
|
return TSDB_CODE_NEED_RETRY;
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2562,6 +2575,9 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
|
||||||
code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
code = mndGetTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
||||||
} else {
|
} else {
|
||||||
code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
code = mndGetTableTSMA(pMnode, tsmaReq.name, &rsp, &exist);
|
||||||
|
if (TSDB_CODE_NEED_RETRY == code) {
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -12977,7 +12977,11 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
||||||
}
|
}
|
||||||
SNode* pCol;
|
SNode* pCol;
|
||||||
col_id_t index = 0;
|
col_id_t index = 0;
|
||||||
tInitDefaultSColCmprWrapperByCols(&req.colCmpr, req.ntb.schemaRow.nCols);
|
int32_t code = tInitDefaultSColCmprWrapperByCols(&req.colCmpr, req.ntb.schemaRow.nCols);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
tdDestroySVCreateTbReq(&req);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
FOREACH(pCol, pStmt->pCols) {
|
FOREACH(pCol, pStmt->pCols) {
|
||||||
SColumnDefNode* pColDef = (SColumnDefNode*)pCol;
|
SColumnDefNode* pColDef = (SColumnDefNode*)pCol;
|
||||||
SSchema* pScheam = req.ntb.schemaRow.pSchema + index;
|
SSchema* pScheam = req.ntb.schemaRow.pSchema + index;
|
||||||
|
|
|
@ -615,6 +615,8 @@ class TDTestCase:
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
tdLog.debug(f"start to excute {__file__}")
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
tdSql.init(conn.cursor(), False)
|
tdSql.init(conn.cursor(), False)
|
||||||
|
tdSql.execute('alter local "debugFlag" "143"')
|
||||||
|
tdSql.execute('alter dnode 1 "debugFlag" "143"')
|
||||||
self.tsma_tester: TSMATester = TSMATester(tdSql)
|
self.tsma_tester: TSMATester = TSMATester(tdSql)
|
||||||
self.tsma_sql_generator: TSMATestSQLGenerator = TSMATestSQLGenerator()
|
self.tsma_sql_generator: TSMATestSQLGenerator = TSMATestSQLGenerator()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue