fix recursive tsma

This commit is contained in:
wangjiaming0909 2024-02-22 18:24:35 +08:00
parent d8200b2f63
commit c3e73d9168
15 changed files with 251 additions and 86 deletions

View File

@ -447,9 +447,9 @@ typedef enum ENodeType {
} ENodeType; } ENodeType;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* dbFName; const char* dbFName;
char* tbName; const char* tbName;
} SBuildTableInput; } SBuildTableInput;
typedef struct { typedef struct {
@ -3570,6 +3570,8 @@ typedef struct {
int64_t lastTs; int64_t lastTs;
int64_t normSourceTbUid; // the Uid of source tb if its a normal table, otherwise 0 int64_t normSourceTbUid; // the Uid of source tb if its a normal table, otherwise 0
SArray* pVgroupVerList; SArray* pVgroupVerList;
int8_t recursiveTsma;
char baseTsmaName[TSDB_TABLE_FNAME_LEN]; // base tsma name for recursively created tsma
} SMCreateSmaReq; } SMCreateSmaReq;
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);

View File

@ -423,6 +423,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481) #define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481)
#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482) #define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482)
#define TSDB_CODE_MND_INVALID_DROP_TSMA TAOS_DEF_ERROR_CODE(0, 0x0485)
// mnode-tag-indxe // mnode-tag-indxe

View File

@ -895,6 +895,8 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
if (tEncodeI32(&encoder, p->vgId) < 0) return -1; if (tEncodeI32(&encoder, p->vgId) < 0) return -1;
if (tEncodeI64(&encoder, p->ver) < 0) return -1; if (tEncodeI64(&encoder, p->ver) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->recursiveTsma) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->baseTsmaName) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -967,6 +969,8 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
} }
} }
} }
if (tDecodeI8(&decoder, &pReq->recursiveTsma) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->baseTsmaName) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;

View File

@ -464,6 +464,7 @@ typedef struct {
char* ast; char* ast;
SSchemaWrapper schemaRow; // for dstVgroup SSchemaWrapper schemaRow; // for dstVgroup
SSchemaWrapper schemaTag; // for dstVgroup SSchemaWrapper schemaTag; // for dstVgroup
char baseSmaName[TSDB_TABLE_FNAME_LEN];
} SSmaObj; } SSmaObj;
typedef struct { typedef struct {

View File

@ -70,6 +70,7 @@ typedef struct SCreateTSMACxt {
const SDbObj * pDb; const SDbObj * pDb;
SStbObj * pSrcStb; SStbObj * pSrcStb;
SSmaObj * pSma; SSmaObj * pSma;
const SSmaObj * pRecursiveSma;
SCMCreateStreamReq *pCreateStreamReq; SCMCreateStreamReq *pCreateStreamReq;
SMDropStreamReq * pDropStreamReq; SMDropStreamReq * pDropStreamReq;
const char * streamName; const char * streamName;
@ -152,6 +153,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
if (pSma->astLen > 0) { if (pSma->astLen > 0) {
SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
} }
SDB_SET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
@ -235,6 +237,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
if (pSma->ast == NULL) goto _OVER; if (pSma->ast == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER)
} }
SDB_GET_BINARY(pRaw, dataPos, pSma->baseSmaName, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER)
@ -1390,6 +1393,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) {
memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->name, pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->stb, pCxt->pCreateSmaReq->stb, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN); memcpy(pCxt->pSma->db, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
if (pCxt->pRecursiveSma) memcpy(pCxt->pSma->baseSmaName, pCxt->pRecursiveSma->name, TSDB_TABLE_FNAME_LEN);
pCxt->pSma->createdTime = taosGetTimestampMs(); pCxt->pSma->createdTime = taosGetTimestampMs();
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
@ -1447,9 +1451,9 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
pCxt->pCreateStreamReq->targetStbUid = 0; pCxt->pCreateStreamReq->targetStbUid = 0;
pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->fillNullCols = NULL;
pCxt->pCreateStreamReq->igUpdate = 0; pCxt->pCreateStreamReq->igUpdate = 0;
// TODO what's this timestamp pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
//pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs; mDebug("tsma create stream with last ts: %" PRId64 "vgversion size: %d", pCxt->pCreateSmaReq->lastTs,
pCxt->pCreateStreamReq->lastTs = 1758414148000; pCxt->pCreateStreamReq->pVgroupVerList ? pCxt->pCreateStreamReq->pVgroupVerList->size : 0);
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
@ -1601,6 +1605,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
SDbObj * pDb = NULL; SDbObj * pDb = NULL;
SStbObj * pStb = NULL; SStbObj * pStb = NULL;
SSmaObj * pSma = NULL; SSmaObj * pSma = NULL;
SSmaObj * pRecursiveTsma = NULL;
SStreamObj * pStream = NULL; SStreamObj * pStream = NULL;
int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId); int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId);
SMCreateSmaReq createReq = {0}; SMCreateSmaReq createReq = {0};
@ -1664,6 +1669,15 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
goto _OVER; goto _OVER;
} }
if (createReq.recursiveTsma) {
pRecursiveTsma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.baseTsmaName);
if (!pRecursiveTsma) {
mError("base tsma: %s not found when creating recursive tsma", createReq.baseTsmaName);
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
goto _OVER;
}
}
SCreateTSMACxt cxt = { SCreateTSMACxt cxt = {
.pMnode = pMnode, .pMnode = pMnode,
.pCreateSmaReq = &createReq, .pCreateSmaReq = &createReq,
@ -1673,6 +1687,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
.pDb = pDb, .pDb = pDb,
.pRpcReq = pReq, .pRpcReq = pReq,
.pSma = NULL, .pSma = NULL,
.pRecursiveSma = pRecursiveTsma,
.pSrcStb = pStb, .pSrcStb = pStb,
}; };
@ -1685,6 +1700,7 @@ _OVER:
} }
if (pStb) mndReleaseStb(pMnode, pStb); if (pStb) mndReleaseStb(pMnode, pStb);
if (pRecursiveTsma) mndReleaseSma(pMnode, pRecursiveTsma);
mndReleaseSma(pMnode, pSma); mndReleaseSma(pMnode, pSma);
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
@ -1759,6 +1775,22 @@ _OVER:
return code; return code;
} }
static bool hasRecursiveTsmasBasedOnMe(SMnode* pMnode, const SSmaObj* pSma) {
SSmaObj *pSmaObj = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSmaObj);
if (pIter == NULL) break;
if (0 == strncmp(pSmaObj->baseSmaName, pSma->name, TSDB_TABLE_FNAME_LEN)) {
sdbRelease(pMnode->pSdb, pSmaObj);
sdbCancelFetch(pMnode->pSdb, pIter);
return true;
}
sdbRelease(pMnode->pSdb, pSmaObj);
}
return false;
}
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
int32_t code = -1; int32_t code = -1;
SMDropSmaReq dropReq = {0}; SMDropSmaReq dropReq = {0};
@ -1795,6 +1827,11 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
goto _OVER; goto _OVER;
} }
if (hasRecursiveTsmasBasedOnMe(pMnode, pSma)) {
terrno = TSDB_CODE_MND_INVALID_DROP_TSMA;
goto _OVER;
}
SCreateTSMACxt cxt = { SCreateTSMACxt cxt = {
.pDb = pDb, .pDb = pDb,
.pMnode = pMnode, .pMnode = pMnode,
@ -1934,7 +1971,7 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
taosMemoryFree(p); taosMemoryFree(p);
} }
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo) { int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo, const SSmaObj* pBaseTsma) {
int32_t code = 0; int32_t code = 0;
pInfo->interval = pSma->interval; pInfo->interval = pSma->interval;
pInfo->unit = pSma->intervalUnit; pInfo->unit = pSma->intervalUnit;
@ -1955,7 +1992,7 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY; if (!pInfo->pFuncs) return TSDB_CODE_OUT_OF_MEMORY;
SNode *pNode, *pFunc; SNode *pNode, *pFunc;
if (TSDB_CODE_SUCCESS != nodesStringToNode(pSma->ast, &pNode)) { if (TSDB_CODE_SUCCESS != nodesStringToNode(pBaseTsma ? pBaseTsma->ast : pSma->ast, &pNode)) {
taosArrayDestroy(pInfo->pFuncs); taosArrayDestroy(pInfo->pFuncs);
pInfo->pFuncs = NULL; pInfo->pFuncs = NULL;
return TSDB_CODE_TSMA_INVALID_STAT; return TSDB_CODE_TSMA_INVALID_STAT;
@ -2004,9 +2041,37 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
return code; return code;
} }
// @note remember to mndReleaseSma(*ppOut)
static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj** ppOut) {
int32_t code = 0;
SSmaObj* pRecursiveTsma = NULL;
if (pSma->baseSmaName[0]) {
pRecursiveTsma = mndAcquireSma(pMnode, pSma->baseSmaName);
if (!pRecursiveTsma) {
mError("base tsma: %s for tsma: %s not found", pSma->baseSmaName, pSma->name);
return TSDB_CODE_MND_SMA_NOT_EXIST;
}
while (pRecursiveTsma->baseSmaName[0]) {
// TODO test 2 level recursive tsma
SSmaObj* pTmpSma = pRecursiveTsma;
pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
if (!pRecursiveTsma) {
mError("base tsma: %s for tsma: %s not found", pTmpSma->baseSmaName, pTmpSma->name);
mndReleaseSma(pMnode, pTmpSma);
return TSDB_CODE_MND_SMA_NOT_EXIST;
}
mndReleaseSma(pMnode, pTmpSma);
}
}
*ppOut = pRecursiveTsma;
return code;
}
static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) { static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rsp, bool *exist) {
int32_t code = -1; int32_t code = -1;
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
SSmaObj *pBaseTsma = NULL;
SStbObj *pDstStb = NULL; SStbObj *pDstStb = NULL;
pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName); pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, tsmaFName);
@ -2025,10 +2090,17 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
return code; return code;
} }
terrno = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma); terrno = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
if (terrno == 0) {
terrno = dumpTSMAInfoFromSmaObj(pSma, pDstStb, pTsma, pBaseTsma);
}
mndReleaseStb(pMnode, pDstStb); mndReleaseStb(pMnode, pDstStb);
sdbRelease(pMnode->pSdb, pSma); sdbRelease(pMnode->pSdb, pSma);
if (terrno) return code; if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
if (terrno) {
tFreeTableTSMAInfo(pTsma);
return code;
}
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeTableTSMAInfo(pTsma); tFreeTableTSMAInfo(pTsma);
@ -2041,6 +2113,7 @@ static int32_t mndGetTSMA(SMnode *pMnode, char *tsmaFName, STableTSMAInfoRsp *rs
static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) { static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp *rsp, bool *exist) {
int32_t code = -1; int32_t code = -1;
SSmaObj * pSma = NULL; SSmaObj * pSma = NULL;
SSmaObj * pBaseTsma = NULL;
SSdb * pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL; void * pIter = NULL;
SStreamObj * pStreamObj = NULL; SStreamObj * pStreamObj = NULL;
@ -2088,11 +2161,19 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
return code; return code;
} }
terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma);
pTsma->streamUid = streamId; pTsma->streamUid = streamId;
terrno = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseTsma);
if (terrno == 0) {
terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma, pBaseTsma);
}
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
if (terrno) return code; if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
if (terrno) {
tFreeTableTSMAInfo(pTsma);
return code;
}
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) { if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeTableTSMAInfo(pTsma); tFreeTableTSMAInfo(pTsma);
@ -2153,7 +2234,8 @@ static int32_t mndProcessGetTbTSMAReq(SRpcMsg *pReq) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
mError("failed to get table tsma %s since %s", tsmaReq.name, terrstr()); mError("failed to get table tsma %s since %s fetching with tsma name %d", tsmaReq.name, terrstr(),
tsmaReq.fetchingWithTsmaName);
} }
tFreeTableTSMAInfoRsp(&rsp); tFreeTableTSMAInfoRsp(&rsp);
@ -2227,16 +2309,26 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
// dump smaObj into rsp // dump smaObj into rsp
STableTSMAInfo * pInfo = NULL; STableTSMAInfo * pInfo = NULL;
pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pInfo || (terrno = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo))) { if (!pInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mndReleaseSma(pMnode, pSma); mndReleaseSma(pMnode, pSma);
mndReleaseStb(pMnode, pDestStb); mndReleaseStb(pMnode, pDestStb);
taosMemoryFreeClear(pInfo); goto _OVER;
}
SSmaObj* pBaseSma = NULL;
terrno = mndGetDeepestBaseForTsma(pMnode, pSma, &pBaseSma);
if (terrno == 0) terrno = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo, pBaseSma);
mndReleaseStb(pMnode, pDestStb);
mndReleaseSma(pMnode, pSma);
if (pBaseSma) mndReleaseSma(pMnode, pBaseSma);
if (terrno) {
tFreeTableTSMAInfo(pInfo);
goto _OVER; goto _OVER;
} }
taosArrayPush(hbRsp.pTsmas, pInfo); taosArrayPush(hbRsp.pTsmas, pInfo);
mndReleaseStb(pMnode, pDestStb);
mndReleaseSma(pMnode, pSma);
} }
rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp); rspLen = tSerializeTSMAHbRsp(NULL, 0, &hbRsp);

View File

@ -563,8 +563,9 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
} }
} }
// TODO remove log // TODO remove log
tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64, ver, *fhFinished, tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64
verRange.maxVer, cur, latest); "lastst-cur %" PRId64,
ver, *fhFinished, verRange.maxVer, cur, latest, latest - cur);
if (pDelay != NULL) { // delay in ms if (pDelay != NULL) { // delay in ms
*pDelay = (latest - cur) / 1000; *pDelay = (latest - cur) / 1000;

View File

@ -972,7 +972,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation* operation);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char* dbFName, SCtgDBCache** pCache); int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char* dbFName, SCtgDBCache** pCache);
void ctgReleaseDBCache(SCatalog* pCtg, SCtgDBCache* dbCache); void ctgReleaseDBCache(SCatalog* pCtg, SCtgDBCache* dbCache);
void ctgRUnlockVgInfo(SCtgDBCache* dbCache); void ctgRUnlockVgInfo(SCtgDBCache* dbCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char* dbFName, char* tbName, int32_t* exist); int32_t ctgTbMetaExistInCache(SCatalog* pCtg, const char* dbFName, const char* tbName, int32_t* exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbVerFromCache(SCatalog* pCtg, SName* pTableName, int32_t* sver, int32_t* tver, int32_t* tbType, int32_t ctgReadTbVerFromCache(SCatalog* pCtg, SName* pTableName, int32_t* sver, int32_t* tver, int32_t* tbType,
uint64_t* suid, char* stbName); uint64_t* suid, char* stbName);
@ -1033,7 +1033,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
SCtgTask* pTask); SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out, int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
SCtgTask* pTask); SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, const char* tbName,
STableMetaOutput* out, SCtgTaskReq* tReq); STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out, int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
SCtgTaskReq* tReq); SCtgTaskReq* tReq);

View File

@ -2707,8 +2707,22 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) {
taosArrayPush(pCtx->pResList, &(SMetaRes){0}); taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA)); CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA));
} else { } else {
SMetaRes* pRes = taosArrayGet(pCtx->pResList, 0);
STableTSMAInfoRsp* pRsp = (STableTSMAInfoRsp*)pRes->pRes;
ASSERT(pRsp->pTsmas->size == 1);
const STSMACache* pTsma = taosArrayGetP(pRsp->pTsmas, 0);
TSWAP(pTask->res, pCtx->pResList); TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); // get tsma target stable meta if not existed in cache
int32_t exists = false;
CTG_ERR_RET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists));
if (!exists) {
SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0};
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, 0);
if (!pMsgCtx->pBatchs) pMsgCtx->pBatchs = pJob->pBatchs;
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, &tReq));
} else {
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2742,23 +2756,13 @@ int32_t ctgHandleGetTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
pRes->pRes = pOut; pRes->pRes = pOut;
pMsgCtx->out = NULL; pMsgCtx->out = NULL;
TSWAP(pTask->res, pCtx->pResList); TSWAP(pTask->res, pCtx->pResList);
break;
STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0); STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0);
int32_t exists = false;
SName dstTbName = *pName; CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists));
strcpy(dstTbName.tname, pTsma->targetTb); if (!exists) {
SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = CTG_FLAG_STB;
stbCtx.pName = &dstTbName;
STableMeta* pDstTbMeta = NULL;
(void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &pDstTbMeta);
if (!pDstTbMeta) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out); TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, dstTbName.tname, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, tReq));
} else {
taosMemoryFreeClear(pDstTbMeta);
} }
} }
} break; } break;
@ -2781,7 +2785,6 @@ _return:
int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
bool taskDone = false; bool taskDone = false;
bool hasSubFetch = false;
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
@ -2830,7 +2833,6 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
.vgId = pVgInfo->vgId}; .vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++; pFetch->subFetchNum++;
hasSubFetch = true;
pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo); pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo);
} }
} }
@ -2893,7 +2895,6 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
.vgId = pVgInfo->vgId}; .vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++; pFetch->subFetchNum++;
hasSubFetch = true;
pVgInfo = taosHashIterate(pVgHash, pVgInfo); pVgInfo = taosHashIterate(pVgHash, pVgInfo);
} }
} }
@ -2916,7 +2917,6 @@ _return:
if (code) { if (code) {
SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx);
pRes->code = code; pRes->code = code;
pRes->pRes = NULL;
if (TSDB_CODE_MND_SMA_NOT_EXIST == code) { if (TSDB_CODE_MND_SMA_NOT_EXIST == code) {
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} else { } else {
@ -2926,10 +2926,10 @@ _return:
tstrerror(code)); tstrerror(code));
} }
bool allSubFetchFinished = false; bool allSubFetchFinished = false;
if (reqType == TDMT_VND_GET_STREAM_PROGRESS) { if (pMsgCtx->reqType == TDMT_VND_GET_STREAM_PROGRESS) {
allSubFetchFinished = atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) >= pFetch->subFetchNum; allSubFetchFinished = atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) >= pFetch->subFetchNum;
} }
if ((allSubFetchFinished || !hasSubFetch) && 0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) { if ((allSubFetchFinished || pFetch->subFetchNum == 0) && 0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) {
TSWAP(pTask->res, pCtx->pResList); TSWAP(pTask->res, pCtx->pResList);
taskDone = true; taskDone = true;
} }

View File

@ -259,7 +259,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *pCache = NULL; SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
@ -503,7 +503,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) { int32_t ctgTbMetaExistInCache(SCatalog *pCtg, const char *dbFName, const char *tbName, int32_t *exist) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL; SCtgTbCache *tbCache = NULL;
ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache); ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);

View File

@ -1143,7 +1143,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, const char* tbName,
STableMetaOutput* out, SCtgTaskReq* tReq) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask* pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};

View File

@ -321,7 +321,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal);
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc); static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc);
static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList, static int32_t createSimpleSelectStmtFromProjList(const char* pDb, const char* pTable, SNodeList* pProjectionList,
SSelectStmt** pStmt); SSelectStmt** pStmt);
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery); static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char* pkColName, SNode** pQuery);
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery); static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery);
static int32_t setRefreshMeta(STranslateContext* pCxt, SQuery* pQuery); static int32_t setRefreshMeta(STranslateContext* pCxt, SQuery* pQuery);
@ -8326,7 +8326,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache); code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision; pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision;
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery); code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache->schema[0].name, &pStmt->pPrevQuery);
} }
taosMemoryFreeClear(pMetaCache); taosMemoryFreeClear(pMetaCache);
} }
@ -9622,14 +9622,14 @@ static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStream
return code; return code;
} }
static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta, SNode** pQuery) { static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char* pkColName, SNode** pQuery) {
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == col) { if (NULL == col) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias)); tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias));
tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName)); tstrncpy(col->colName, pkColName, tListLen(col->colName));
SNodeList* pParameterList = nodesMakeList(); SNodeList* pParameterList = nodesMakeList();
if (NULL == pParameterList) { if (NULL == pParameterList) {
nodesDestroyNode((SNode*)col); nodesDestroyNode((SNode*)col);
@ -9812,7 +9812,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
} }
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) { if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) {
SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable); SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable);
code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta, &pStmt->pPrevQuery); code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta->schema[0].name,
&pStmt->pPrevQuery);
/* /*
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STranslateContext cxt = {0}; STranslateContext cxt = {0};
@ -10624,6 +10625,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
// useTbName is base tsma name // useTbName is base tsma name
code = getTsma(pCxt, useTbName, &pRecursiveTsma); code = getTsma(pCxt, useTbName, &pRecursiveTsma);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pReq->recursiveTsma = true;
tNameExtractFullName(useTbName, pReq->baseTsmaName);
SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval; SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval;
if (pRecursiveTsma->interval < pInterval->datum.i && pInterval->datum.i % pRecursiveTsma->interval == 0) { if (pRecursiveTsma->interval < pInterval->datum.i && pInterval->datum.i % pRecursiveTsma->interval == 0) {
} else { } else {
@ -10649,6 +10652,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
numOfTags = pRecursiveTsma->pTags->size; numOfTags = pRecursiveTsma->pTags->size;
pCols = pRecursiveTsma->pUsedCols->pData; pCols = pRecursiveTsma->pUsedCols->pData;
pTags = pRecursiveTsma->pTags->pData; pTags = pRecursiveTsma->pTags->pData;
code = getTableMeta(pCxt, pStmt->dbName, pRecursiveTsma->targetTb, &pTableMeta);
} }
} else { } else {
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
@ -10679,11 +10683,10 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
code = buildTSMAAst(pCxt, pStmt, pReq, pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName, code = buildTSMAAst(pCxt, pStmt, pReq, pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName,
numOfTags, pTags); numOfTags, pTags);
} }
if (TSDB_CODE_SUCCESS == code && !pStmt->pOptions->recursiveTsma) { //TODO remvoe recursive tsma check if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code) { const char* pkColName = pTableMeta->schema[0].name;
pStmt->pOptions->tsPrecision = pTableMeta->tableInfo.precision; const char* tbName = pStmt->pOptions->recursiveTsma ? pRecursiveTsma->targetTb : pStmt->tableName;
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pTableMeta, &pStmt->pPrevQuery); code = createLastTsSelectStmt(pStmt->dbName, tbName, pkColName, &pStmt->pPrevQuery);
}
} }
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);

View File

@ -1189,6 +1189,8 @@ int32_t getTsmaFromCache(SParseMetaCache* pMetaCache, const SName* pTsmaName, ST
if (TSDB_CODE_SUCCESS == code && pTsmaRsp) { if (TSDB_CODE_SUCCESS == code && pTsmaRsp) {
ASSERT(pTsmaRsp->pTsmas->size == 1); ASSERT(pTsmaRsp->pTsmas->size == 1);
*pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0); *pTsma = taosArrayGetP(pTsmaRsp->pTsmas, 0);
} else if (code == TSDB_CODE_PAR_INTERNAL_ERROR){
code = TSDB_CODE_MND_SMA_NOT_EXIST;
} }
return code; return code;
} }

View File

@ -5937,7 +5937,11 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
} }
STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i);
if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) continue; if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) {
qInfo("tsma %s filtered out history: %d rspTs: %ld reqTs: %ld delay: %ld, rspTs - reqTs: %ld", pTsma->name,
pTsma->fillHistoryFinished, pTsma->rspTs, pTsma->reqTs, pTsma->delayDuration, pTsma->rspTs - pTsma->reqTs);
continue;
}
// filter with interval // filter with interval
// TODO unit not right // TODO unit not right
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {

View File

@ -325,6 +325,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists in db") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists in db")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "sma not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DROP_TSMA, "Invalid drop base tsma, drop recursive tsma first")
// mnode-view // mnode-view
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_ALREADY_EXIST, "view already exists in db") TAOS_DEFINE_ERROR(TSDB_CODE_MND_VIEW_ALREADY_EXIST, "view already exists in db")

View File

@ -1,7 +1,4 @@
from os import name
from random import randrange from random import randrange
from socket import TIPC_ADDR_NAMESEQ
import taos
import time import time
import threading import threading
@ -50,7 +47,7 @@ class UsedTsma:
def __repr__(self) -> str: def __repr__(self) -> str:
return self.__str__() return self.__str__()
def setIsTsma(self): def setIsTsma(self):
self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX) self.is_tsma_ = self.name.endswith(self.TSMA_RES_STB_POSTFIX)
@ -92,7 +89,7 @@ class TSMAQCBuilder:
def with_sql(self, sql: str): def with_sql(self, sql: str):
self.qc_.sql = sql self.qc_.sql = sql
return self return self
def to_timestamp(self, ts: str) -> float: def to_timestamp(self, ts: str) -> float:
if ts == UsedTsma.TS_MAX or ts == UsedTsma.TS_MIN: if ts == UsedTsma.TS_MAX or ts == UsedTsma.TS_MIN:
return float(ts) return float(ts)
@ -220,11 +217,11 @@ class TSMATestSQLGenerator:
self.where_list_: List[str] = [] self.where_list_: List[str] = []
self.group_or_partition_by_list: List[str] = [] self.group_or_partition_by_list: List[str] = []
self.interval: str = '' self.interval: str = ''
def get_random_type(self, funcs): def get_random_type(self, funcs):
rand: int = randrange(1, len(funcs)) rand: int = randrange(1, len(funcs))
return funcs[rand-1]() return funcs[rand-1]()
def generate_one(self) -> str: def generate_one(self) -> str:
pass pass
@ -245,7 +242,7 @@ class TSMATestSQLGenerator:
sql = ' ts ' sql = ' ts '
if len(left) > 0: if len(left) > 0:
sql += '%s ' % (left) sql += '%s ' % (left)
if len(right) > 0: if len(right) > 0:
if len(sql) > 0: if len(sql) > 0:
sql += 'and ts ' sql += 'and ts '
@ -336,7 +333,7 @@ class TDTestCase:
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return return
def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int): def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int):
sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 VARCHAR(255), c4 INT)' % (db_name, tb_name) sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 VARCHAR(255), c4 INT)' % (db_name, tb_name)
tsql.execute(sql) tsql.execute(sql)
@ -413,7 +410,7 @@ class TDTestCase:
def wait_for_tsma_calculation(self, func_list: list, db: str, tb: str, interval: str, tsma_name: str): def wait_for_tsma_calculation(self, func_list: list, db: str, tb: str, interval: str, tsma_name: str):
while True: while True:
sql = 'select %s from %s.%s interval(%s)' % (', '.join(func_list), db, tb, interval) sql = 'select %s from %s.%s interval(%s)' % (', '.join(func_list), db, tb, interval)
tdLog.debug('waiting for tsma %s to be useful with sql %s' % (tsma_name, sql)) tdLog.debug(f'waiting for tsma {db}.{tsma_name} to be useful with sql {sql}')
ctx: TSMAQueryContext = self.tsma_tester.get_tsma_query_ctx(sql) ctx: TSMAQueryContext = self.tsma_tester.get_tsma_query_ctx(sql)
if ctx.has_tsma(): if ctx.has_tsma():
if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX: if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX:
@ -430,11 +427,11 @@ class TDTestCase:
tdSql.execute(sql, queryTimes=1) tdSql.execute(sql, queryTimes=1)
self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name) self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name)
def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str): def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str, func_list: List[str] = ['avg(c1)']):
tdSql.execute('use %s' % db, queryTimes=1) tdSql.execute('use %s' % db, queryTimes=1)
sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval) sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval)
tdSql.execute(sql, queryTimes=1) tdSql.execute(sql, queryTimes=1)
self.wait_for_tsma_calculation(['avg(c1)'], db, tb_name, interval, new_tsma_name) self.wait_for_tsma_calculation(func_list, db, tb_name, interval, new_tsma_name)
def drop_tsma(self, tsma_name: str, db: str): def drop_tsma(self, tsma_name: str, db: str):
sql = 'DROP TSMA %s.%s' % (db, tsma_name) sql = 'DROP TSMA %s.%s' % (db, tsma_name)
@ -449,7 +446,7 @@ class TDTestCase:
break break
if not plan_found: if not plan_found:
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(explain_output))) tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(explain_output)))
def check(self, func): def check(self, func):
for ctx in func(): for ctx in func():
self.tsma_tester.check_sql(ctx.sql, ctx) self.tsma_tester.check_sql(ctx.sql, ctx)
@ -457,20 +454,35 @@ class TDTestCase:
def test_query_with_tsma(self): def test_query_with_tsma(self):
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
#self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m', 'meters')
#self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h', 'meters')
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
## why need 10s, filling history not finished yet
#ctx = TSMAQCBuilder().with_sql('select avg(c1) from meters').should_query_with_table('meters', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
#self.tsma_tester.check_sql(ctx.sql, ctx)
#time.sleep(5)
#time.sleep(9999999)
self.test_query_with_tsma_interval() self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg() self.test_query_with_tsma_agg()
self.test_recursive_tsma()
## self.test_query_with_drop_tsma() ## self.test_query_with_drop_tsma()
## self.test_query_with_add_tag() ## self.test_query_with_add_tag()
## self.test_union() ## self.test_union()
def test_recursive_tsma(self):
tdSql.execute('drop tsma tsma2')
func_list: List[str] = ['avg(c2)', 'avg(c3)']
self.create_tsma('tsma3', 'test', 'meters', func_list, '5m')
self.create_recursive_tsma('tsma3', 'tsma4', 'test', '20m', 'meters', func_list)
## now we have 5m, 10m, 30m, 1h 4 tsmas
sql = 'select avg(c2), "recursive tsma4" from meters'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma4', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()
self.tsma_tester.check_sql(sql, ctx)
self.create_recursive_tsma('tsma4', 'tsma6', 'test', '1h', 'meters', func_list)
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma6', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()
self.tsma_tester.check_sql(sql, ctx)
tdSql.error('drop tsma tsma3', -2147482491)
tdSql.error('drop tsma tsma4', -2147482491)
tdSql.execute('drop tsma tsma6')
tdSql.execute('drop tsma tsma4')
tdSql.execute('drop tsma tsma3')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
def test_query_with_tsma_interval(self): def test_query_with_tsma_interval(self):
self.check(self.test_query_with_tsma_interval_no_partition) self.check(self.test_query_with_tsma_interval_no_partition)
self.check(self.test_query_with_tsma_interval_partition_by_col) self.check(self.test_query_with_tsma_interval_partition_by_col)
@ -493,7 +505,7 @@ class TDTestCase:
sql = 'select avg(c1), avg(c2) from meters interval(60m)' sql = 'select avg(c1), avg(c2) from meters interval(60m)'
ctxs.append(TSMAQCBuilder().with_sql(sql) \ ctxs.append(TSMAQCBuilder().with_sql(sql) \
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()) .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)" sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)"
ctxs.append(TSMAQCBuilder().with_sql(sql) \ ctxs.append(TSMAQCBuilder().with_sql(sql) \
.should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \ .should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \
@ -546,7 +558,7 @@ class TDTestCase:
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \ .should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \ .should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_qc()) .should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_qc())
sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"' sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"'
ctxs.append(TSMAQCBuilder().with_sql(sql) \ ctxs.append(TSMAQCBuilder().with_sql(sql) \
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()) .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
@ -558,7 +570,7 @@ class TDTestCase:
sql = 'select avg(c1), avg(c2), spread(c4) from meters' sql = 'select avg(c1), avg(c2), spread(c4) from meters'
ctxs.append(TSMAQCBuilder().with_sql(sql) \ ctxs.append(TSMAQCBuilder().with_sql(sql) \
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc()) .should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_qc())
return ctxs return ctxs
def test_query_with_tsma_agg_group_by_tbname(self): def test_query_with_tsma_agg_group_by_tbname(self):
@ -574,21 +586,51 @@ class TDTestCase:
self.init_data() self.init_data()
#time.sleep(999999) #time.sleep(999999)
self.test_create_tsma() self.test_create_tsma()
#self.test_drop_tsma() self.test_drop_tsma()
#time.sleep(9999999)
self.test_tb_ddl_with_created_tsma() self.test_tb_ddl_with_created_tsma()
self.test_query_with_tsma() self.test_query_with_tsma()
#time.sleep(999999) #time.sleep(999999)
def test_create_tsma(self): def test_create_tsma(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
self.test_create_tsma_on_stable() self.test_create_tsma_on_stable()
self.test_create_tsma_on_norm_table() self.test_create_tsma_on_norm_table()
self.test_create_tsma_on_child_table() self.test_create_tsma_on_child_table()
self.test_create_recursive_tsma() self.test_create_recursive_tsma()
## self.test_drop_stable() ## self.test_drop_stable()
## self.test_drop_ctable() ## self.test_drop_ctable()
## self.test_drop_db() self.test_drop_db()
def test_drop_tsma(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_recursive_tsma('tsma1', 'tsma2', 'test', '15m', 'meters')
tdSql.error('drop tsma tsma1', -2147482491) ## drop recursive tsma first
tdSql.execute('drop tsma tsma2', queryTimes=1)
tdSql.execute('drop tsma tsma1', queryTimes=1)
tdSql.execute('drop database test', queryTimes=1)
self.init_data()
def test_drop_db(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
tdSql.execute('create database nsdb precision "ns"', queryTimes=1)
tdSql.execute('use nsdb', queryTimes=1)
tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1)
## TODO insert data
self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
self.create_recursive_tsma('tsma1', 'tsma2', 'nsdb', '10m', 'meters')
tdSql.query('select avg(c1) from meters', queryTimes=1)
tdSql.execute('drop database nsdb', queryTimes=1)
def test_tb_ddl_with_created_tsma(self): def test_tb_ddl_with_created_tsma(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
tdSql.execute('create database nsdb precision "ns"', queryTimes=1) tdSql.execute('create database nsdb precision "ns"', queryTimes=1)
tdSql.execute('use nsdb', queryTimes=1) tdSql.execute('use nsdb', queryTimes=1)
tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1)
@ -604,9 +646,11 @@ class TDTestCase:
tdSql.execute('alter table meters drop tag t3', queryTimes=1) tdSql.execute('alter table meters drop tag t3', queryTimes=1)
tdSql.execute('drop database nsdb') tdSql.execute('drop database nsdb')
## test_drop stream ## TODO test drop stream
def test_create_tsma_on_stable(self): def test_create_tsma_on_stable(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
tdSql.execute('create database nsdb precision "ns"', queryTimes=1) tdSql.execute('create database nsdb precision "ns"', queryTimes=1)
tdSql.execute('use nsdb', queryTimes=1) tdSql.execute('use nsdb', queryTimes=1)
tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1)
@ -618,18 +662,27 @@ class TDTestCase:
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097) tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097)
tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999u)', -2147471097) tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999u)', -2147471097)
tdSql.execute('drop tsma tsma1') tdSql.execute('drop tsma tsma1', queryTimes=1)
tdSql.execute('use test', queryTimes=1)
tdSql.execute('create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2)) interval(10m)', queryTimes=1)
self.wait_for_tsma_calculation(['avg(c1)', 'avg(c2)'], 'nsdb', 'meters', '10m', 'tsma1')
tdSql.execute('drop tsma nsdb.tsma1', queryTimes=1)
tdSql.error('create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) tdSql.error('create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
tdSql.execute('drop database nsdb') tdSql.execute('drop database nsdb')
def test_create_tsma_on_norm_table(self): def test_create_tsma_on_norm_table(self):
pass function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
def test_create_tsma_on_child_table(self): def test_create_tsma_on_child_table(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
tdSql.error('create tsma tsma1 on test.t1 function(avg(c1), avg(c2)) interval(1m)', -2147471098) ## Invalid table to create tsma, only stable or normal table allowed tdSql.error('create tsma tsma1 on test.t1 function(avg(c1), avg(c2)) interval(1m)', -2147471098) ## Invalid table to create tsma, only stable or normal table allowed
def test_create_recursive_tsma(self): def test_create_recursive_tsma(self):
function_name = sys._getframe().f_code.co_name
tdLog.debug(f'-----{function_name}------')
tdSql.execute('use test') tdSql.execute('use test')
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m')
sql = 'create recursive tsma tsma2 on tsma1 interval(1m)' sql = 'create recursive tsma tsma2 on tsma1 interval(1m)'