tsma avoid fetching tsma for no tsma tables
This commit is contained in:
parent
f6200dd923
commit
de1c28d623
|
@ -1403,9 +1403,15 @@ int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
|
|||
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
|
||||
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SArray* pTsmas; // SArray<STableTSMAInfo*>
|
||||
} STableTSMAInfoRsp;
|
||||
|
||||
typedef struct {
|
||||
SUseDbRsp* useDbRsp;
|
||||
SDbCfgRsp* cfgRsp;
|
||||
STableTSMAInfoRsp* pTsmaRsp;
|
||||
int32_t dbTsmaVersion;
|
||||
} SDbHbRsp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -4234,10 +4240,6 @@ typedef struct {
|
|||
bool fillHistoryFinished;
|
||||
} STableTSMAInfo;
|
||||
|
||||
typedef struct {
|
||||
SArray* pTsmas; // SArray<STableTSMAInfo*>
|
||||
} STableTSMAInfoRsp;
|
||||
|
||||
int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp);
|
||||
int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pRsp);
|
||||
int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes);
|
||||
|
|
|
@ -158,6 +158,7 @@ typedef struct SDbCacheInfo {
|
|||
int32_t cfgVersion;
|
||||
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||
int64_t stateTs;
|
||||
int32_t tsmaVersion;
|
||||
} SDbCacheInfo;
|
||||
|
||||
typedef struct SDynViewVersion {
|
||||
|
@ -404,7 +405,7 @@ int32_t ctgdEnableDebug(char* option, bool enable);
|
|||
|
||||
int32_t ctgdHandleDbgCommand(char* command);
|
||||
|
||||
int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma);
|
||||
int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma, int32_t tsmaVersion);
|
||||
|
||||
int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma);
|
||||
|
||||
|
|
|
@ -234,6 +234,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
|
||||
rsp->cfgRsp = NULL;
|
||||
}
|
||||
if (rsp->pTsmaRsp) {
|
||||
if (rsp->pTsmaRsp->pTsmas) {
|
||||
for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
|
||||
STableTSMAInfo* pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
|
||||
catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion);
|
||||
}
|
||||
taosArrayClear(rsp->pTsmaRsp->pTsmas);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -797,6 +806,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
db->cfgVersion = htonl(db->cfgVersion);
|
||||
db->numOfTable = htonl(db->numOfTable);
|
||||
db->stateTs = htobe64(db->stateTs);
|
||||
db->tsmaVersion = htonl(db->tsmaVersion);
|
||||
}
|
||||
|
||||
SKv kv = {
|
||||
|
|
|
@ -67,6 +67,8 @@
|
|||
|
||||
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
|
||||
static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq);
|
||||
static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoRsp *pRsp);
|
||||
static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pRsp);
|
||||
|
||||
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL) {
|
||||
|
@ -3666,6 +3668,14 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) {
|
|||
if (tEncodeI8(pEncoder, 0) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pRsp->pTsmaRsp) {
|
||||
if (tEncodeI8(pEncoder, 1) < 0) return -1;
|
||||
if (tEncodeTableTSMAInfoRsp(pEncoder, pRsp->pTsmaRsp) < 0) return -1;
|
||||
} else {
|
||||
if (tEncodeI8(pEncoder, 0) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI32(pEncoder, pRsp->dbTsmaVersion) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -3746,6 +3756,17 @@ int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) {
|
|||
if (NULL == pRsp->cfgRsp) return -1;
|
||||
if (tDeserializeSDbCfgRspImpl(decoder, pRsp->cfgRsp) < 0) return -1;
|
||||
}
|
||||
if (!tDecodeIsEnd(decoder)) {
|
||||
if (tDecodeI8(decoder, &flag) < 0) return -1;
|
||||
if (flag) {
|
||||
pRsp->pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||
if (!pRsp->pTsmaRsp) return -1;
|
||||
if (tDecodeTableTSMAInfoRsp(decoder, pRsp->pTsmaRsp) < 0) return -1;
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(decoder)) {
|
||||
if (tDecodeI32(decoder, &pRsp->dbTsmaVersion) < 0) return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -3795,6 +3816,10 @@ void tFreeSDbHbRsp(SDbHbRsp *pDbRsp) {
|
|||
tFreeSDbCfgRsp(pDbRsp->cfgRsp);
|
||||
taosMemoryFree(pDbRsp->cfgRsp);
|
||||
}
|
||||
if (pDbRsp->pTsmaRsp) {
|
||||
tFreeTableTSMAInfoRsp(pDbRsp->pTsmaRsp);
|
||||
taosMemoryFree(pDbRsp->pTsmaRsp);
|
||||
}
|
||||
}
|
||||
|
||||
void tFreeSDbHbBatchRsp(SDbHbBatchRsp *pRsp) {
|
||||
|
|
|
@ -398,6 +398,7 @@ typedef struct {
|
|||
SRWLatch lock;
|
||||
int64_t stateTs;
|
||||
int64_t compactStartTime;
|
||||
int32_t tsmaVersion;
|
||||
} SDbObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -31,6 +31,7 @@ int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|||
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
|
||||
int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t numOfTsmas, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
#include "tjson.h"
|
||||
|
||||
#define DB_VER_NUMBER 1
|
||||
#define DB_RESERVE_SIZE 32
|
||||
#define DB_RESERVE_SIZE 28
|
||||
|
||||
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
|
||||
|
@ -146,6 +146,7 @@ SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.s3KeepLocal, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.s3Compact, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.withArbitrator, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pDb->tsmaVersion, _OVER);
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
@ -241,6 +242,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.s3KeepLocal, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.s3Compact, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.withArbitrator, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pDb->tsmaVersion, _OVER);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER)
|
||||
taosInitRWLatch(&pDb->lock);
|
||||
|
@ -347,6 +349,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
|||
pOld->cfg.s3Compact = pNew->cfg.s3Compact;
|
||||
pOld->cfg.withArbitrator = pNew->cfg.withArbitrator;
|
||||
pOld->compactStartTime = pNew->compactStartTime;
|
||||
pOld->tsmaVersion = pNew->tsmaVersion;
|
||||
taosWUnLockLatch(&pOld->lock);
|
||||
return 0;
|
||||
}
|
||||
|
@ -681,6 +684,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_DB_FNAME_LEN);
|
||||
dbObj.cfgVersion = 1;
|
||||
dbObj.vgVersion = 1;
|
||||
dbObj.tsmaVersion = 1;
|
||||
memcpy(dbObj.createUser, pUser->user, TSDB_USER_LEN);
|
||||
dbObj.cfg = (SDbCfg){
|
||||
.numOfVgroups = pCreate->numOfVgroups,
|
||||
|
@ -1682,6 +1686,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
|||
pDbCacheInfo->cfgVersion = htonl(pDbCacheInfo->cfgVersion);
|
||||
pDbCacheInfo->numOfTable = htonl(pDbCacheInfo->numOfTable);
|
||||
pDbCacheInfo->stateTs = be64toh(pDbCacheInfo->stateTs);
|
||||
pDbCacheInfo->tsmaVersion = htonl(pDbCacheInfo->tsmaVersion);
|
||||
|
||||
SDbHbRsp rsp = {0};
|
||||
|
||||
|
@ -1720,7 +1725,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
|||
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
|
||||
|
||||
if (pDbCacheInfo->vgVersion >= pDb->vgVersion && pDbCacheInfo->cfgVersion >= pDb->cfgVersion &&
|
||||
numOfTable == pDbCacheInfo->numOfTable && pDbCacheInfo->stateTs == pDb->stateTs) {
|
||||
numOfTable == pDbCacheInfo->numOfTable && pDbCacheInfo->stateTs == pDb->stateTs &&
|
||||
pDbCacheInfo->tsmaVersion >= pDb->tsmaVersion) {
|
||||
mTrace("db:%s, valid dbinfo, vgVersion:%d cfgVersion:%d stateTs:%" PRId64
|
||||
" numOfTables:%d, not changed vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
|
||||
pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs,
|
||||
|
@ -1739,6 +1745,16 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs,
|
|||
mndDumpDbCfgInfo(rsp.cfgRsp, pDb);
|
||||
}
|
||||
|
||||
if (pDbCacheInfo->tsmaVersion != pDb->tsmaVersion) {
|
||||
rsp.pTsmaRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||
if (rsp.pTsmaRsp) rsp.pTsmaRsp->pTsmas = taosArrayInit(4, POINTER_BYTES);
|
||||
if (rsp.pTsmaRsp && rsp.pTsmaRsp->pTsmas) {
|
||||
rsp.dbTsmaVersion = pDb->tsmaVersion;
|
||||
bool exist = false;
|
||||
mndGetDbTsmas(pMnode, 0, pDb->uid, rsp.pTsmaRsp, &exist);
|
||||
}
|
||||
}
|
||||
|
||||
if (pDbCacheInfo->vgVersion < pDb->vgVersion || numOfTable != pDbCacheInfo->numOfTable ||
|
||||
pDbCacheInfo->stateTs != pDb->stateTs) {
|
||||
rsp.useDbRsp = taosMemoryCalloc(1, sizeof(SUseDbRsp));
|
||||
|
|
|
@ -67,7 +67,7 @@ typedef struct SCreateTSMACxt {
|
|||
const SMCreateSmaReq *pCreateSmaReq;
|
||||
const SMDropSmaReq * pDropSmaReq;
|
||||
};
|
||||
const SDbObj * pDb;
|
||||
SDbObj *pDb;
|
||||
SStbObj * pSrcStb;
|
||||
SSmaObj * pSma;
|
||||
const SSmaObj * pBaseSma;
|
||||
|
@ -1463,7 +1463,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
|||
pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pCreateStreamReq->numOfTags, sizeof(SField));
|
||||
SField f = {0};
|
||||
if (pCxt->pSrcStb) {
|
||||
for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) {
|
||||
for (int32_t idx = 0; idx < pCxt->pCreateStreamReq->numOfTags - 1; ++idx) {
|
||||
SSchema *pSchema = &pCxt->pSrcStb->pTags[idx];
|
||||
f.bytes = pSchema->bytes;
|
||||
f.type = pSchema->type;
|
||||
|
@ -1486,12 +1486,28 @@ static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
|
|||
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
|
||||
}
|
||||
|
||||
static int32_t mndCreateTSMASetCreateStreamRedoAction(SMnode* pMnode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t mndSetUpdateDbTsmaVersionPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) {
|
||||
sdbFreeRaw(pRedoRaw);
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateTSMASetCreateStreamUndoAction(SMnode* pMnode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t mndSetUpdateDbTsmaVersionCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
||||
|
@ -1541,6 +1557,11 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
SDbObj newDb = {0};
|
||||
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
||||
newDb.tsmaVersion++;
|
||||
if (mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER;
|
||||
if (mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||
|
@ -1630,7 +1651,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
|||
|
||||
if (createReq.normSourceTbUid == 0) {
|
||||
pStb = mndAcquireStb(pMnode, createReq.stb);
|
||||
if (!pStb) {
|
||||
if (!pStb && !createReq.recursiveTsma) {
|
||||
mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
|
||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
goto _OVER;
|
||||
|
@ -1684,6 +1705,9 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
|||
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
if (!pStb) {
|
||||
createReq.normSourceTbUid = pBaseTsma->stbUid;
|
||||
}
|
||||
}
|
||||
|
||||
SCreateTSMACxt cxt = {
|
||||
|
@ -1770,6 +1794,11 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
SDbObj newDb = {0};
|
||||
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
||||
newDb.tsmaVersion++;
|
||||
if (mndSetUpdateDbTsmaVersionPrepareLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER;
|
||||
if (mndSetUpdateDbTsmaVersionCommitLogs(pCxt->pMnode, pTrans, pCxt->pDb, &newDb) != 0) goto _OVER;
|
||||
if (mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||
if (mndTransAppendRedoAction(pTrans, &dropStreamRedoAction) != 0) goto _OVER;
|
||||
|
@ -1880,6 +1909,7 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
|||
|
||||
if ((pDb && pSma->dbUid != pDb->uid) || !pSrcDb) {
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
if (pSrcDb) mndReleaseDb(pMnode, pSrcDb);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2115,7 +2145,9 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) {
|
||||
typedef bool (*tsmaFilter)(const SSmaObj* pSma, void* param);
|
||||
|
||||
static int32_t mndGetSomeTsmas(SMnode* pMnode, STableTSMAInfoRsp* pRsp, tsmaFilter filtered, void* param, bool* exist) {
|
||||
int32_t code = -1;
|
||||
SSmaObj * pSma = NULL;
|
||||
SSmaObj * pBaseTsma = NULL;
|
||||
|
@ -2123,20 +2155,12 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
|
|||
void * pIter = NULL;
|
||||
SStreamObj * pStreamObj = NULL;
|
||||
SStbObj * pStb = NULL;
|
||||
/*
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (NULL == pStb) {
|
||||
*exist = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
*/
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
|
||||
if (filtered(pSma, param)) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
continue;
|
||||
}
|
||||
|
@ -2182,7 +2206,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
}
|
||||
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
|
||||
if (NULL == taosArrayPush(pRsp->pTsmas, &pTsma)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tFreeTableTSMAInfo(pTsma);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
@ -2193,6 +2217,24 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool tsmaTbFilter(const SSmaObj* pSma, void* param) {
|
||||
const char* tbFName = param;
|
||||
return pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName) != 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *pRsp, bool *exist) {
|
||||
return mndGetSomeTsmas(pMnode, pRsp, tsmaTbFilter, tbFName, exist);
|
||||
}
|
||||
|
||||
static bool tsmaDbFilter(const SSmaObj* pSma, void* param) {
|
||||
uint64_t *dbUid = param;
|
||||
return pSma->dbUid != *dbUid;
|
||||
}
|
||||
|
||||
int32_t mndGetDbTsmas(SMnode *pMnode, const char *dbFName, uint64_t dbUid, STableTSMAInfoRsp *pRsp, bool *exist) {
|
||||
return mndGetSomeTsmas(pMnode, pRsp, tsmaDbFilter, &dbUid, exist);
|
||||
}
|
||||
|
||||
static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
|
||||
STableTSMAInfoRsp rsp = {0};
|
||||
int32_t code = -1;
|
||||
|
@ -2312,7 +2354,10 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
|
|||
SStbObj* pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
|
||||
if (!pDestStb) {
|
||||
mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
|
||||
terrno = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
|
||||
mndReleaseSma(pMnode, pSma);
|
||||
if (terrno) goto _OVER;
|
||||
taosArrayPush(hbRsp.pTsmas, &pTsmaInfo);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -3854,6 +3854,7 @@ static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SV
|
|||
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx* pCtx) {
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
|
||||
mndTransSetChangeless(pTrans);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
|
|
@ -335,6 +335,7 @@ typedef struct SCtgDBCache {
|
|||
SHashObj* tbCache; // key:tbname, value:SCtgTbCache
|
||||
SHashObj* stbCache; // key:suid, value:char*
|
||||
SHashObj* tsmaCache; // key:tbname, value: SCtgTSMACache
|
||||
int32_t tsmaVersion;
|
||||
uint64_t dbCacheNum[CTG_CI_MAX_VALUE];
|
||||
uint64_t dbCacheSize;
|
||||
} SCtgDBCache;
|
||||
|
@ -599,6 +600,8 @@ typedef struct SCtgDropViewMetaMsg {
|
|||
typedef struct SCtgUpdateTbTSMAMsg {
|
||||
SCatalog* pCtg;
|
||||
STableTSMAInfo* pTsma;
|
||||
int32_t dbTsmaVersion;
|
||||
uint64_t dbId;
|
||||
} SCtgUpdateTbTSMAMsg;
|
||||
|
||||
typedef struct SCtgDropTbTSMAMsg {
|
||||
|
@ -1148,7 +1151,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
|||
int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaName);
|
||||
int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* name, STableTSMAInfoRsp* out,
|
||||
SCtgTaskReq* tReq, int32_t reqType);
|
||||
int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp);
|
||||
int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, int32_t tsmaVersion, bool syncOp);
|
||||
int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
|
||||
int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp);
|
||||
int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation);
|
||||
|
|
|
@ -1790,13 +1790,13 @@ int32_t catalogGetViewMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
|
|||
CTG_API_LEAVE(TSDB_CODE_OPS_NOT_SUPPORT);
|
||||
}
|
||||
|
||||
int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma) {
|
||||
int32_t catalogAsyncUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma, int32_t tsmaVersion) {
|
||||
CTG_API_ENTER();
|
||||
if (!pCtg || !ppTsma) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, ppTsma, false));
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, ppTsma, tsmaVersion, false));
|
||||
|
||||
_return:
|
||||
CTG_API_LEAVE(code);
|
||||
|
@ -1808,7 +1808,7 @@ int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** pTsma) {
|
|||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, pTsma, true));
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, pTsma, 0, true));
|
||||
|
||||
_return:
|
||||
CTG_API_LEAVE(code);
|
||||
|
@ -1844,7 +1844,7 @@ int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName
|
|||
tsmasRsp.pTsmas = NULL;
|
||||
|
||||
for (int32_t i = 0; i < (*ppRes)->size; ++i) {
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), false));
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), 0, false));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -2896,7 +2896,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
for (int32_t i = 0; i < taosArrayGetSize(pTsmas); ++i) {
|
||||
STableTSMAInfo* pInfo = taosArrayGetP(pTsmas, i);
|
||||
CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma));
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, &pTsma, false));
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, &pTsma, 0, false));
|
||||
}
|
||||
|
||||
if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) {
|
||||
|
|
|
@ -1415,7 +1415,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
|||
|
||||
CTG_CACHE_NUM_INC(CTG_CI_DB, 1);
|
||||
|
||||
SDbCacheInfo dbCacheInfo = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0, .cfgVersion = -1};
|
||||
SDbCacheInfo dbCacheInfo = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0, .cfgVersion = -1, .tsmaVersion = -1};
|
||||
tstrncpy(dbCacheInfo.dbFName, dbFName, sizeof(dbCacheInfo.dbFName));
|
||||
|
||||
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
||||
|
@ -1802,7 +1802,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
|
||||
bool newAdded = false;
|
||||
SDbCacheInfo dbCacheInfo = {
|
||||
.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .cfgVersion = -1, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
|
||||
.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .cfgVersion = -1, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs, .tsmaVersion = -1};
|
||||
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
|
||||
|
@ -1846,6 +1846,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
|
||||
if (dbCache->cfgCache.cfgInfo) {
|
||||
dbCacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion;
|
||||
dbCacheInfo.tsmaVersion = dbCache->tsmaVersion;
|
||||
}
|
||||
|
||||
vgCache->vgInfo = dbInfo;
|
||||
|
@ -1913,6 +1914,7 @@ int32_t ctgOpUpdateDbCfg(SCtgCacheOperation *operation) {
|
|||
} else {
|
||||
cacheInfo.vgVersion = -1;
|
||||
}
|
||||
cacheInfo.tsmaVersion = dbCache->tsmaVersion;
|
||||
|
||||
ctgWLockDbCfgInfo(dbCache);
|
||||
|
||||
|
@ -3230,16 +3232,13 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
|||
|
||||
// get tsma cache
|
||||
pCache = taosHashAcquire(dbCache->tsmaCache, tsmaSourceTbName.tname, strlen(tsmaSourceTbName.tname));
|
||||
if (!pCache) {
|
||||
ctgDebug("tsma for tb: %s.%s not in cache", dbFName, tsmaSourceTbName.tname);
|
||||
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName);
|
||||
if (!pCache || !pCache->pTsmas || pCache->pTsmas->size == 0) {
|
||||
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
|
||||
CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1);
|
||||
continue;
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_READ, &pCache->tsmaLock);
|
||||
if (!pCache->pTsmas || pCache->pTsmas->size == 0 || hasOutOfDateTSMACache(pCache->pTsmas)) {
|
||||
if (hasOutOfDateTSMACache(pCache->pTsmas)) {
|
||||
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
|
||||
taosHashRelease(dbCache->tsmaCache, pCache);
|
||||
ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName);
|
||||
|
@ -3340,7 +3339,7 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, bool syncOp) {
|
||||
int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, int32_t tsmaVersion, bool syncOp) {
|
||||
int32_t code = 0;
|
||||
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
|
||||
op->opId = CTG_OP_UPDATE_TB_TSMA;
|
||||
|
@ -3355,6 +3354,8 @@ int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, bool syncOp)
|
|||
|
||||
msg->pCtg = pCtg;
|
||||
msg->pTsma = *pTsma;
|
||||
msg->dbTsmaVersion = tsmaVersion;
|
||||
msg->dbId = (*pTsma)->dbId;
|
||||
|
||||
op->data = msg;
|
||||
|
||||
|
@ -3601,8 +3602,24 @@ int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) {
|
|||
}
|
||||
|
||||
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache));
|
||||
|
||||
CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo));
|
||||
if (dbCache && msg->dbTsmaVersion > 0) {
|
||||
dbCache->tsmaVersion = msg->dbTsmaVersion;
|
||||
SDbCacheInfo cacheInfo = {0};
|
||||
cacheInfo.dbId = dbCache->dbId;
|
||||
if (dbCache->cfgCache.cfgInfo) {
|
||||
cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion;
|
||||
tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
if (dbCache->vgCache.vgInfo) {
|
||||
cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion;
|
||||
cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable;
|
||||
cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs;
|
||||
}
|
||||
cacheInfo.tsmaVersion = dbCache->tsmaVersion;
|
||||
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo),
|
||||
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
|
|
|
@ -10523,13 +10523,13 @@ static int32_t translateShowCreateView(STranslateContext* pCxt, SShowCreateViewS
|
|||
#endif
|
||||
}
|
||||
|
||||
SNode* createColumnNodeWithName(const char* name) {
|
||||
static SColumnNode* createColumnNodeWithName(const char* name) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (!pCol) return NULL;
|
||||
tstrncpy(pCol->colName, name, TSDB_COL_NAME_LEN);
|
||||
tstrncpy(pCol->node.aliasName, name, TSDB_COL_NAME_LEN);
|
||||
tstrncpy(pCol->node.userAlias, name, TSDB_COL_NAME_LEN);
|
||||
return (SNode*)pCol;
|
||||
return pCol;
|
||||
}
|
||||
|
||||
static bool sortFuncWithFuncId(SNode* pNode1, SNode* pNode2) {
|
||||
|
@ -10609,10 +10609,11 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
|
|||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t partitionTagNum = pStmt->pOptions->recursiveTsma ? numOfTags - 1 : numOfTags;
|
||||
// append partition by tags
|
||||
SNode* pTagCol = NULL;
|
||||
for (int32_t idx = 0; idx < numOfTags; ++idx) {
|
||||
pTagCol = createColumnNodeWithName(pTags[idx].name);
|
||||
for (int32_t idx = 0; idx < partitionTagNum; ++idx) {
|
||||
pTagCol = (SNode*)createColumnNodeWithName(pTags[idx].name);
|
||||
if (!pTagCol) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
|
@ -10624,11 +10625,17 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
|
|||
// sub table
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SFunctionNode* pSubTable = NULL;
|
||||
code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable);
|
||||
pTagCol = NULL;
|
||||
if (pTags && numOfTags > 0) {
|
||||
pTagCol = (SNode*)createColumnNodeWithName(pTags[numOfTags - 1].name);
|
||||
if (!pTagCol) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = buildTSMAAstStreamSubTable(pStmt, pReq, pStmt->pOptions->recursiveTsma ? pTagCol : (SNode*)pTbnameFunc, (SNode**)&pSubTable);
|
||||
info.pSubTable = (SNode*)pSubTable;
|
||||
}
|
||||
code = nodesListMakeStrictAppend(&info.pTags, nodesCloneNode((SNode*)pTbnameFunc));
|
||||
if (code == TSDB_CODE_SUCCESS)
|
||||
code = nodesListMakeStrictAppend(&info.pTags, pStmt->pOptions->recursiveTsma ? pTagCol : nodesCloneNode((SNode*)pTbnameFunc));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10769,9 +10776,9 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
|||
memcpy(pStmt->originalTbName, pRecursiveTsma->tb, TSDB_TABLE_NAME_LEN);
|
||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb);
|
||||
numOfCols = pRecursiveTsma->pUsedCols->size;
|
||||
numOfTags = pRecursiveTsma->pTags->size;
|
||||
numOfTags = pRecursiveTsma->pTags ? pRecursiveTsma->pTags->size: 0;
|
||||
pCols = pRecursiveTsma->pUsedCols->pData;
|
||||
pTags = pRecursiveTsma->pTags->pData;
|
||||
pTags = pRecursiveTsma->pTags ? pRecursiveTsma->pTags->pData : NULL;
|
||||
code = getTableMeta(pCxt, pStmt->dbName, pRecursiveTsma->targetTb, &pTableMeta);
|
||||
}
|
||||
} else {
|
||||
|
@ -12424,10 +12431,13 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
|||
SVAlterTbReq* pReq) {
|
||||
SName tbName = {0};
|
||||
SArray* pTsmas = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pCxt->pMetaCache) {
|
||||
toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tbName);
|
||||
int32_t code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas);
|
||||
code = getTableTsmasFromCache(pCxt->pMetaCache, &tbName, &pTsmas);
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
if (pTsmas && pTsmas->size > 0) return TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
||||
}
|
||||
|
||||
SSchema* pSchema = getTagSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
|
|
|
@ -5969,6 +5969,10 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
|||
pTsmaScanCols = taosArrayInit(pTsmaOptCtx->pAggFuncs->length, sizeof(int32_t));
|
||||
if (!pTsmaScanCols) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (pTsmaOptCtx->pScan->tableType == TSDB_CHILD_TABLE || pTsmaOptCtx->pScan->tableType == TSDB_NORMAL_TABLE) {
|
||||
const STsmaTargetTbInfo* ptbInfo = taosArrayGet(pTsmaOptCtx->pScan->pTsmaTargetTbInfo, i);
|
||||
if (ptbInfo->uid == 0) continue; // tsma res table meta not found, skip this tsma, this is possible when there is no data in this ctb
|
||||
}
|
||||
|
||||
STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i);
|
||||
if (!pTsma->fillHistoryFinished || tsMaxTsmaCalcDelay * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) {
|
||||
|
|
|
@ -498,6 +498,18 @@ class TSMATestSQLGenerator:
|
|||
ret = ret + f' LIMIT {random.randint(0, self.opts_.limit_max)}'
|
||||
return ret
|
||||
|
||||
## if offset is True, offset cannot be the same as interval
|
||||
def generate_random_offset_sliding(self, interval: str, offset: bool = False) -> str:
|
||||
unit = interval[-1]
|
||||
hasUnit = unit.isalpha()
|
||||
if not hasUnit:
|
||||
start = 1
|
||||
if offset:
|
||||
start = 2
|
||||
ret: int = int(int(interval) / random.randint(start, 5))
|
||||
return str(ret)
|
||||
return ''
|
||||
|
||||
# add sliding offset
|
||||
def generate_interval(self, intervals: List[str]) -> str:
|
||||
if not self.opts_.interval:
|
||||
|
@ -506,7 +518,17 @@ class TSMATestSQLGenerator:
|
|||
return ''
|
||||
value = random.choice(intervals)
|
||||
self.res_.has_interval = True
|
||||
return f'INTERVAL({value})'
|
||||
has_offset = False
|
||||
offset = ''
|
||||
has_sliding = False
|
||||
sliding = ''
|
||||
num: int = int(value[:-1])
|
||||
unit = value[-1]
|
||||
if has_offset and num > 1:
|
||||
offset = f', {self.generate_random_offset_sliding(value, True)}'
|
||||
if has_sliding:
|
||||
sliding = f'sliding({self.generate_random_offset_sliding(value)})'
|
||||
return f'INTERVAL({value} {offset}) {sliding}'
|
||||
|
||||
def generate_tag_list(self):
|
||||
used_tag_num = random.randrange(1, self.opts_.tag_num + 1)
|
||||
|
@ -714,12 +736,16 @@ class TDTestCase:
|
|||
if ctx.has_tsma():
|
||||
if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX:
|
||||
break
|
||||
elif len(ctx.used_tsmas[0].name) == 32 and 1: ## select md5
|
||||
elif len(ctx.used_tsmas[0].name) == 32:
|
||||
name = f'1.{db}.{tsma_name}_{tb}'
|
||||
if ctx.used_tsmas[0].name == TSMAQCBuilder().md5(name):
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str):
|
||||
tdSql.execute('use %s' % db)
|
||||
|
@ -757,16 +783,14 @@ class TDTestCase:
|
|||
self.tsma_tester.check_sql(ctx.sql, ctx)
|
||||
|
||||
def test_query_with_tsma(self):
|
||||
self.create_tsma('tsma1', 'test', 'meters', [
|
||||
'avg(c1)', 'avg(c2)'], '5m')
|
||||
self.create_tsma('tsma2', 'test', 'meters', [
|
||||
'avg(c1)', 'avg(c2)'], '30m')
|
||||
self.create_tsma('tsma5', 'test', 'norm_tb', [
|
||||
'avg(c1)', 'avg(c2)'], '10m')
|
||||
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
|
||||
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
|
||||
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
|
||||
|
||||
self.test_query_with_tsma_interval()
|
||||
self.test_query_with_tsma_agg()
|
||||
self.test_recursive_tsma()
|
||||
self.test_query_interval_sliding()
|
||||
self.test_union()
|
||||
self.test_query_child_table()
|
||||
self.test_skip_tsma_hint()
|
||||
|
@ -774,6 +798,43 @@ class TDTestCase:
|
|||
self.test_long_ctb_name()
|
||||
self.test_add_tag_col()
|
||||
self.test_modify_col_name_value()
|
||||
self.test_alter_tag_val()
|
||||
self.test_ins_tsma()
|
||||
|
||||
def test_ins_tsma(self):
|
||||
tdSql.execute('use performance_schema')
|
||||
tdSql.query('show tsmas')
|
||||
tdSql.checkRows(0)
|
||||
tdSql.execute('use test')
|
||||
tdSql.query('show tsmas')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query('select * from information_schema.ins_tsmas')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.execute('create database dd')
|
||||
tdSql.execute('use dd')
|
||||
tdSql.execute('create table norm_tb (ts timestamp, c1 int)')
|
||||
tdSql.execute('insert into norm_tb values(now, 1)')
|
||||
self.create_tsma('tsma_norm_tb_dd', 'dd', 'norm_tb', ['avg(c1)', 'sum(c1)', 'min(c1)'], '10m')
|
||||
tdSql.query('show tsmas')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query('select * from information_schema.ins_tsmas')
|
||||
tdSql.checkRows(4)
|
||||
tdSql.query('show test.tsmas')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.execute('use test')
|
||||
tdSql.query('show dd.tsmas')
|
||||
tdSql.checkRows(1)
|
||||
tdSql.execute('drop database dd')
|
||||
tdSql.query('select * from information_schema.ins_tsmas')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.execute('use test')
|
||||
|
||||
def test_alter_tag_val(self):
|
||||
sql = 'alter table t1 set tag t1 = 999'
|
||||
tdSql.error(sql, -2147471088)
|
||||
|
||||
def test_query_interval_sliding(self):
|
||||
pass
|
||||
|
||||
def test_union(self):
|
||||
ctxs = []
|
||||
|
@ -914,14 +975,21 @@ class TDTestCase:
|
|||
'tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
|
||||
self.check([ctx])
|
||||
|
||||
# test recrusive tsma on norm_tb
|
||||
tsma_name = 'tsma_recursive_on_norm_tb'
|
||||
self.create_recursive_tsma('tsma5', tsma_name, 'test', '20m', 'norm_tb', ['avg(c1)', 'avg(c2)'])
|
||||
sql = 'select avg(c1), avg(c2), tbname from norm_tb partition by tbname interval(20m)'
|
||||
self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma_ctb('test', tsma_name, 'norm_tb').get_qc()])
|
||||
tdSql.execute(f'drop tsma {tsma_name}')
|
||||
|
||||
def test_query_with_tsma_interval(self):
|
||||
self.check(self.test_query_with_tsma_interval_possibly_partition())
|
||||
self.check(self.test_query_with_tsma_interval_partition_by_col())
|
||||
|
||||
def test_query_tsma_all(self, func_list: List = ['avg(c1)', 'avg(c2)']) -> List:
|
||||
ctxs = []
|
||||
interval_list = ['1s', '5s', '60s', '1m', '10m', '20m',
|
||||
'30m', '59s', '1h', '120s', '1200', '2h', '90m', '1d']
|
||||
interval_list = ['1s', '5s', '59s', '60s', '1m', '120s', '10m', '20m',
|
||||
'30m', '1h', '90m', '2h', '8h', '1d']
|
||||
opts: TSMATesterSQLGeneratorOptions = TSMATesterSQLGeneratorOptions()
|
||||
opts.interval = True
|
||||
opts.where_ts_range = True
|
||||
|
@ -962,6 +1030,12 @@ class TDTestCase:
|
|||
sql = 'select avg(c1), avg(c2) from meters interval(60m)'
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc())
|
||||
sql = 'select avg(c1), avg(c2) from meters interval(60m, 30m) SLIDING(30m)'
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc())
|
||||
sql = 'select avg(c1), avg(c2) from meters interval(60m, 25m) SLIDING(25m)'
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc())
|
||||
|
||||
sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)"
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
|
@ -969,6 +1043,12 @@ class TDTestCase:
|
|||
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999')
|
||||
.should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.664').get_qc())
|
||||
|
||||
sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m, 25m) SLIDING(10m)"
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_table('meters', '2018-09-17 09:00:00.009', '2018-09-17 09:04:59.999')
|
||||
.should_query_with_tsma('tsma1', '2018-09-17 09:05:00', '2018-09-17 09:54:59.999')
|
||||
.should_query_with_table('meters', '2018-09-17 09:55:00.000', '2018-09-17 10:23:19.664').get_qc())
|
||||
|
||||
sql = "SELECT avg(c1), avg(c2),_wstart, _wend,t3,t4,t5,t2 FROM meters WHERE ts >= '2018-09-17 8:00:00' AND ts < '2018-09-17 09:03:18.334' PARTITION BY t3,t4,t5,t2 INTERVAL(1d);"
|
||||
ctxs.append(TSMAQCBuilder().with_sql(sql)
|
||||
.should_query_with_table('meters', '2018-09-17 8:00:00', '2018-09-17 09:03:18.333').get_qc())
|
||||
|
@ -1116,9 +1196,6 @@ class TDTestCase:
|
|||
self.test_ddl()
|
||||
self.test_query_with_tsma()
|
||||
|
||||
def test_ins_tsma(self):
|
||||
pass
|
||||
|
||||
def test_create_tsma(self):
|
||||
function_name = sys._getframe().f_code.co_name
|
||||
tdLog.debug(f'-----{function_name}------')
|
||||
|
|
Loading…
Reference in New Issue