fix ci and TODOs

This commit is contained in:
wangjiaming0909 2024-03-22 15:25:14 +08:00
parent e43580f71b
commit 6ce0616ffd
28 changed files with 3261 additions and 3415 deletions

View File

@ -415,6 +415,7 @@ typedef struct STUidTagInfo {
int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime);
#define TSMA_RES_STB_POSTFIX "_tsma_res_stb_"
#define TSMA_RES_CTB_PREFIX_LEN 33 // md5 output(32) and _
static inline bool isTsmaResSTb(const char* stbName) {
const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX);

View File

@ -4279,6 +4279,7 @@ typedef struct SDropCtbWithTsmaSingleVgReq {
int32_t tEncodeSMDropTbReqOnSingleVg(SEncoder* pEncoder, const SMDropTbReqsOnSingleVg* pReq);
int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg* pReq);
void tFreeSMDropTbReqOnSingleVg(void* p);
typedef struct SDropTbsReq {
SArray* pVgReqs; // SMDropTbReqsOnSingleVg
@ -4286,6 +4287,7 @@ typedef struct SDropTbsReq {
int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq);
int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq);
void tFreeSMDropTbsReq(void*);
typedef struct SVFetchTtlExpiredTbsRsp {
SArray* pExpiredTbs; // SVDropTbReq

View File

@ -613,12 +613,6 @@ typedef struct SCreateTSMAStmt {
SMCreateSmaReq* pReq;
} SCreateTSMAStmt;
typedef struct SShowCreateTSMAStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tsmaName[TSDB_TABLE_NAME_LEN];
}SShowCreateTSMAStmt;
typedef struct SDropTSMAStmt {
ENodeType type;
bool ignoreNotExists;

View File

@ -173,6 +173,7 @@ typedef struct SAggLogicNode {
bool isGroupTb;
bool isPartTb; // true if partition keys has tbname
bool hasGroup;
SNodeList *pTsmaSubplans;
} SAggLogicNode;
typedef struct SProjectLogicNode {
@ -311,6 +312,7 @@ typedef struct SWindowLogicNode {
bool isPartTb;
int64_t windowCount;
int64_t windowSliding;
SNodeList* pTsmaSubplans;
} SWindowLogicNode;
typedef struct SFillLogicNode {
@ -369,7 +371,6 @@ typedef struct SLogicSubplan {
int32_t level;
int32_t splitFlag;
int32_t numOfComputeNodes;
SNodeList* pTsmaChildren;
} SLogicSubplan;
typedef struct SQueryLogicPlan {

View File

@ -10121,7 +10121,6 @@ static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pR
pRsp->pTsmas = taosArrayInit(size, POINTER_BYTES);
if (!pRsp->pTsmas) return -1;
for (int32_t i = 0; i < size; ++i) {
// TODO tsma add a test case when decode failed, to see if the array is freed
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pTsma) return -1;
taosArrayPush(pRsp->pTsmas, &pTsma);
@ -10338,6 +10337,11 @@ int32_t tDecodeSMDropTbReqOnSingleVg(SDecoder* pDecoder, SMDropTbReqsOnSingleVg*
return 0;
}
void tFreeSMDropTbReqOnSingleVg(void *p) {
SMDropTbReqsOnSingleVg* pReq = p;
taosArrayDestroy(pReq->pTbs);
}
int32_t tSerializeSMDropTbsReq(void* buf, int32_t bufLen, const SMDropTbsReq* pReq){
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -10375,6 +10379,11 @@ int32_t tDeserializeSMDropTbsReq(void* buf, int32_t bufLen, SMDropTbsReq* pReq)
return 0;
}
void tFreeSMDropTbsReq(void* p) {
SMDropTbsReq* pReq = p;
taosArrayDestroyEx(pReq->pVgReqs, tFreeSMDropTbReqOnSingleVg);
}
int32_t tEncodeVFetchTtlExpiredTbsRsp(SEncoder* pCoder, const SVFetchTtlExpiredTbsRsp* pRsp) {
if (tEncodeI32(pCoder, pRsp->vgId) < 0) return -1;
int32_t size = pRsp->pExpiredTbs ? pRsp->pExpiredTbs->size : 0;

View File

@ -1477,7 +1477,6 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
pCxt->pDropStreamReq->igNotExists = false;
// TODO tsma fill sql
pCxt->pDropStreamReq->sql = strdup(pCxt->pDropSmaReq->name);
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
}
@ -1492,11 +1491,10 @@ static int32_t mndCreateTSMASetCreateStreamUndoAction(SMnode* pMnode) {
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
int32_t code = -1;
// TODO tsma change the action name
STransAction redoAction = {0};
STransAction undoAction = {0};
// TODO tsma trans conflicting setting, maybe conflict with myself
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
STransAction createStreamRedoAction = {0};
STransAction createStreamUndoAction = {0};
STrans *pTrans =
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
if (!pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
@ -1508,31 +1506,31 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
pCxt->pCreateStreamReq->name);
mndGetMnodeEpSet(pCxt->pMnode, &redoAction.epSet);
redoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
redoAction.msgType = TDMT_STREAM_CREATE;
redoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
redoAction.pCont = taosMemoryCalloc(1, redoAction.contLen);
if (!redoAction.pCont) {
mndGetMnodeEpSet(pCxt->pMnode, &createStreamRedoAction.epSet);
createStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
createStreamRedoAction.msgType = TDMT_STREAM_CREATE;
createStreamRedoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq);
createStreamRedoAction.pCont = taosMemoryCalloc(1, createStreamRedoAction.contLen);
if (!createStreamRedoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (redoAction.contLen != tSerializeSCMCreateStreamReq(redoAction.pCont, redoAction.contLen, pCxt->pCreateStreamReq)) {
if (createStreamRedoAction.contLen != tSerializeSCMCreateStreamReq(createStreamRedoAction.pCont, createStreamRedoAction.contLen, pCxt->pCreateStreamReq)) {
mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
undoAction.epSet = redoAction.epSet;
undoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
undoAction.actionType = TDMT_STREAM_DROP;
undoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
undoAction.pCont = taosMemoryCalloc(1, undoAction.contLen);
if (!undoAction.pCont) {
createStreamUndoAction.epSet = createStreamRedoAction.epSet;
createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
createStreamUndoAction.actionType = TDMT_STREAM_DROP;
createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
if (!createStreamUndoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (undoAction.contLen != tSerializeSMDropStreamReq(undoAction.pCont, undoAction.contLen, pCxt->pDropStreamReq)) {
if (createStreamUndoAction.contLen != tSerializeSMDropStreamReq(createStreamUndoAction.pCont, createStreamUndoAction.contLen, pCxt->pDropStreamReq)) {
mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
@ -1541,9 +1539,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
if (mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &redoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &undoAction) != 0) goto _OVER;
//TODO tsma add drop stable undo action
if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER;
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;
@ -1745,11 +1742,10 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
goto _OVER;
}
// output stable is not drop when dropping stream, dropping it when dropping tsma
// output stable is not dropped when dropping stream, dropping it when dropping tsma
SMDropStbReq dropStbReq = {0};
dropStbReq.igNotExists = false;
tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
// TODO tsma fill sql, sql may be freed
dropStbReq.sql = "drop";
dropStbReq.sqlLen = 5;
@ -1918,7 +1914,6 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
// interval
// TODO tsma replace 64
char interval[64 + VARSTR_HEADER_SIZE] = {0};
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
@ -2058,7 +2053,6 @@ static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj**
return TSDB_CODE_MND_SMA_NOT_EXIST;
}
while (pRecursiveTsma->baseSmaName[0]) {
// TODO tsma test 2 level recursive tsma
SSmaObj* pTmpSma = pRecursiveTsma;
pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
if (!pRecursiveTsma) {

View File

@ -3899,6 +3899,7 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0)
code = 0;
_OVER:
tFreeSMDropTbsReq(&dropReq);
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
return code;
}

View File

@ -1193,6 +1193,24 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
}
}
if (pStream->smaId != 0) {
void *pIter = NULL;
SSmaObj *pSma = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
while (pIter) {
if (pSma && pSma->uid == pStream->smaId) {
sdbRelease(pMnode->pSdb, pSma);
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pMnode->pSdb, pIter);
tFreeMDropStreamReq(&dropReq);
terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED;
return -1;
}
if (pSma) sdbRelease(pMnode->pSdb, pSma);
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
}
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq);

View File

@ -562,10 +562,6 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
latest = pReader->pHead->head.ingestTs;
}
}
// TODO tsma remove log
tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64
"lastst-cur %" PRId64,
ver, *fhFinished, verRange.maxVer, cur, latest, latest - cur);
if (pDelay != NULL) { // delay in ms
*pDelay = (latest - cur) / 1000;

View File

@ -301,7 +301,6 @@ typedef STableTSMAInfo STSMACache;
typedef struct SCtgTbCache {
SRWLatch metaLock;
SRWLatch indexLock;
SRWLatch tsmaLock;// TODO tsma remove?
STableMeta* pMeta;
STableIndex* pIndex;
} SCtgTbCache;

View File

@ -1832,7 +1832,6 @@ _return:
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
STableTSMAInfoRsp tsmasRsp = {0};
//TODO tsma get from cache first
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA);
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
code = 0;

View File

@ -785,7 +785,6 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
}
if (tbTsmaNum > 0) {
// TODO tsma when create recursive tsma, avoid get tb tsma task
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
}
if (tsmaNum > 0) {
@ -2660,7 +2659,6 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0, baseResIdx = 0;
// TODO tsma test multi tables from multi dbs
for (int32_t idx = 0; idx < dbNum; ++idx) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, idx);
CTG_ERR_RET(ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables));
@ -2881,7 +2879,6 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
} break;
case TDMT_VND_GET_STREAM_PROGRESS: {
// update progress into res
// TODO tsma pack all streams into one req, and handle all stream rsps together
STableTSMAInfoRsp* pTsmasRsp = pRes->pRes;
SArray* pTsmas = pTsmasRsp->pTsmas;
SStreamProgressRsp* pRsp = pMsgCtx->out;
@ -2939,7 +2936,6 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
ctgRemoveTbMetaFromCache(pCtg, pTbName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
}
// TODO tsma add tb meta to cache
if (META_TYPE_BOTH_TABLE == pOut->metaType) {
// rewrite tsma fetch table with it's super table name
sprintf(pFetch->tsmaSourceTbName.tname, "%s", pOut->tbName);

View File

@ -3028,7 +3028,6 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
}
// TODO tsma TEST normal table
CTG_ERR_JRET(ctgDropTSMAForTbEnqueue(pCtg, pTableName, syncReq));
_return:
@ -3171,59 +3170,6 @@ int32_t ctgGetViewsFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgViewsC
return TSDB_CODE_SUCCESS;
}
void ctgReleaseTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
if (dbCache) {
ctgReleaseDBCache(pCtg, dbCache);
}
}
#if 0
int32_t ctgAcquireTbTSMAFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
int32_t sz = 0;
pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
goto _return;
}
CTG_LOCK(CTG_READ, &pCache->tsmaLock);
if (NULL == pCache->pTsmas) {
ctgDebug("tb %s tsma not in cache, dbFName:%s", tbName, dbFName);
goto _return;
}
*pDb = dbCache;
*pTb = pCache;
ctgDebug("tb %s tsma got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_HIT_INC(CTG_CI_TBL_TSMA, 1);
return TSDB_CODE_SUCCESS;
_return:
ctgReleaseTbTSMAToCache(pCtg, dbCache, pCache);
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
return TSDB_CODE_SUCCESS;
}
#endif
int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx, int32_t* fetchIdx, int32_t baseResIdx,
SArray* pList) {
int32_t code = 0;
@ -3237,7 +3183,6 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
int32_t tbNum = taosArrayGetSize(pList);
SCtgTbCache * pTbCache = NULL;
// TODO tsma test sys db
if (IS_SYS_DBNAME(pName->dbname)) {
return TSDB_CODE_SUCCESS;
}
@ -3247,7 +3192,6 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache));
if (!dbCache) {
ctgDebug("DB %s not in cache", dbFName);
// TODO tsma test no db cache, select from another db
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
taosArrayPush(pCtx->pResList, &(SMetaData){0});
@ -3278,7 +3222,6 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
taosHashRelease(dbCache->stbCache, stbName);
} else {
ctgDebug("stb in db: %s, uid: %" PRId64 " not in cache", dbFName, suid);
// TODO tsma remove flag
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
continue;
@ -3308,7 +3251,6 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
CTG_CACHE_HIT_INC(CTG_CI_TBL_TSMA, 1);
// TODO tsma use construct and destructor pattern
STableTSMAInfoRsp *pRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
if (!pRsp) {
ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
@ -3321,7 +3263,6 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SMetaRes res = {0};
// TODO tsma if pCache->pTsmas is empty, maybe we should get tsmas from mnode
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
STSMACache *pTsmaOut = NULL;
STSMACache *pTsmaCache = taosArrayGetP(pCache->pTsmas, i);
@ -3446,7 +3387,6 @@ int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool sync
msg->tbId = pTsma->suid;
msg->tsmaId = pTsma->tsmaId;
tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN);
// TODO tsma use table name len, instead of TSDB_TABLE_FNAME_LEN
tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN);
tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN);

View File

@ -1566,7 +1566,6 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
return TSDB_CODE_SUCCESS;
}
// TODO tsma test errors
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
void* bInput) {

View File

@ -2396,8 +2396,12 @@ int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) {
}
uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) {
//TODO tsma
return 0;
if (!pTsmaInfo) return 0;
uint64_t size = sizeof(STableTSMAInfo);
if (pTsmaInfo->pFuncs) size += sizeof(STableTSMAFuncInfo) * pTsmaInfo->pFuncs->size;
if (pTsmaInfo->pTags) size += sizeof(SSchema) * pTsmaInfo->pTags->size;
if (pTsmaInfo->pUsedCols) size += sizeof(SSchema) * pTsmaInfo->pUsedCols->size;
return size;
}
bool hasOutOfDateTSMACache(SArray* pTsmas) {

View File

@ -3981,17 +3981,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = stddevPartialFinalize,
},
{
//TODO tsma for outer use not only internal
.name = "_avg_state",
.type = FUNCTION_TYPE_AVG_STATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
.translateFunc = translateAvgState,
//.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getAvgFuncEnv,
.initFunc = avgFunctionSetup,
.processFunc = avgFunction,
.finalizeFunc = avgPartialFinalize,
//.combineFunc = avgCombine,
.pPartialFunc = "_avg_partial",
.pMergeFunc = "_avg_state_merge"
},

View File

@ -7433,27 +7433,6 @@ static int32_t jsonToDropTSMAStmt(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkShowCreateTSMAStmtDbName = "DbName";
static const char* jkShowCreateTSMAStmtTsmaName = "TSMAName";
static int32_t showCreateTSMAStmtToJson(const void* pObj, SJson* pJson) {
const SShowCreateTSMAStmt* pNode = (const SShowCreateTSMAStmt*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkShowCreateTSMAStmtDbName, pNode->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkShowCreateTSMAStmtTsmaName, pNode->tsmaName);
}
return code;
}
static int32_t jsonToShowCreateTSMAStmt(const SJson* pJson, void* pObj) {
SShowCreateTSMAStmt* pNode = (SShowCreateTSMAStmt*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkShowCreateTSMAStmtDbName, pNode->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkShowCreateTSMAStmtTsmaName, pNode->tsmaName);
}
return code;
}
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN:

View File

@ -587,7 +587,6 @@ cmd ::= CREATE TSMA not_exists_opt(B) tsma_name(C)
cmd ::= CREATE RECURSIVE TSMA not_exists_opt(B) tsma_name(C)
ON full_table_name(D) INTERVAL NK_LP duration_literal(E) NK_RP. { pCxt->pRootNode = createCreateTSMAStmt(pCxt, B, &C, NULL, D, releaseRawExprNode(pCxt, E)); }
cmd ::= DROP TSMA exists_opt(B) full_tsma_name(C). { pCxt->pRootNode = createDropTSMAStmt(pCxt, B, C); }
cmd ::= SHOW CREATE TSMA full_tsma_name(B). { pCxt->pRootNode = createShowCreateTSMAStmt(pCxt, B); }
cmd ::= SHOW db_name_cond_opt(B) TSMAS. { pCxt->pRootNode = createShowTSMASStmt(pCxt, B); }
full_tsma_name(A) ::= tsma_name(B). { A = createRealTableNode(pCxt, NULL, &B, NULL); }

View File

@ -2928,20 +2928,6 @@ SNode* createDropTSMAStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode*
return (SNode*)pStmt;
}
SNode* createShowCreateTSMAStmt(SAstCreateContext* pCxt, SNode* pRealTable) {
CHECK_PARSER_STATUS(pCxt);
SShowCreateTSMAStmt* pStmt = (SShowCreateTSMAStmt*)nodesMakeNode(QUERY_NODE_SHOW_CREATE_TSMA_STMT);
CHECK_OUT_OF_MEM(pStmt);
SRealTableNode* pTableNode = (SRealTableNode*)pRealTable;
memcpy(pStmt->tsmaName, pTableNode->table.tableName, TSDB_TABLE_NAME_LEN);
memcpy(pStmt->dbName, pTableNode->table.dbName, TSDB_DB_NAME_LEN);
nodesDestroyNode(pRealTable);
return (SNode*)pStmt;
}
SNode* createShowTSMASStmt(SAstCreateContext* pCxt, SNode* dbName) {
CHECK_PARSER_STATUS(pCxt);

View File

@ -3674,7 +3674,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
SName tsmaTargetTbName = {0};
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name); // TODO tsma what if tsma name is too long
int32_t len = snprintf(buf, TSDB_TABLE_FNAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name);
len = taosCreateMD5Hash(buf, len);
len = sprintf(buf + len, "_%s", pRealTable->table.tableName);
strncpy(tsmaTargetTbName.tname, buf, TSDB_TABLE_NAME_LEN);
@ -5697,7 +5697,7 @@ static int32_t setEqualTbnameTableVgroups(STranslateContext* pCxt, SSelectStmt*
for (int32_t k = 0; k < pInfo->aTbnames->size; ++k) {
const char* pTbName = taosArrayGetP(pInfo->aTbnames, k);
char* pNewTbName = taosMemoryCalloc(1, 34 + strlen(pTbName) + 1);
char* pNewTbName = taosMemoryCalloc(1, TSMA_RES_CTB_PREFIX_LEN + strlen(pTbName) + 1);
if (!pNewTbName) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
@ -10579,6 +10579,7 @@ static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSm
if (TSDB_CODE_SUCCESS == code) {
sprintf(pTsmaNameHashVNode->literal, "%s", pReq->name);
int32_t len = taosCreateMD5Hash(pTsmaNameHashVNode->literal, strlen(pTsmaNameHashVNode->literal));
ASSERT(len == TSMA_RES_CTB_PREFIX_LEN - 1);
sprintf(pTsmaNameHashVNode->literal + len, "_");
pTsmaNameHashVNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pTsmaNameHashVNode->node.resType.bytes = strlen(pTsmaNameHashVNode->literal);
@ -10595,8 +10596,9 @@ static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSm
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
if (TSDB_CODE_SUCCESS == code) {
SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode;
pV->literal = strdup("34"); // TODO tsma define this magic number
pV->literal = taosMemoryCalloc(1, 64);
if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY;
sprintf(pV->literal, "%d", TSMA_RES_CTB_PREFIX_LEN + 1);
pV->isDuration = false;
pV->translate = false;
pV->node.resType.type = TSDB_DATA_TYPE_INT;
@ -10767,7 +10769,6 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
// TODO tsma 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取.
STableTSMAInfo* pRecursiveTsma = NULL;
int32_t numOfCols = 0, numOfTags = 0;
SSchema * pCols = NULL, *pTags = NULL;
@ -10798,7 +10799,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
memset(useTbName, 0, sizeof(SName));
memcpy(pStmt->originalTbName, pRecursiveTsma->tb, TSDB_TABLE_NAME_LEN);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb);
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO tsma merge pUsedCols and pTags with one SSchema array
numOfCols = pRecursiveTsma->pUsedCols->size;
numOfTags = pRecursiveTsma->pTags->size;
pCols = pRecursiveTsma->pUsedCols->pData;
pTags = pRecursiveTsma->pTags->pData;

File diff suppressed because it is too large Load Diff

View File

@ -5787,13 +5787,10 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
default:
return false;
}
// TODO tsma may need to replace func conds in having
assert(pFuncs);
FOREACH(pTmpNode, pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
// TODO tsma test other pseudo column funcs
// TODO tsma test funcs with multi params
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) &&
!fmIsGroupKeyFunc(pFunc->funcId)) {
return false;
@ -5827,6 +5824,7 @@ typedef struct STSMAOptCtx {
SArray* pUsefulTsmas; // SArray<STSMAOptUseFulTsma>, sorted by tsma interval from long to short
SArray* pUsedTsmas;
SLogicSubplan* generatedSubPlans[2];
SNodeList** ppParentTsmaSubplans;
} STSMAOptCtx;
static int32_t fillTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
@ -5851,10 +5849,12 @@ static int32_t fillTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
pTsmaOptCtx->queryInterval->precision = pWindow->node.precision;
pTsmaOptCtx->queryInterval->tz = tsTimezone;
pTsmaOptCtx->pAggFuncs = pWindow->pFuncs;
pTsmaOptCtx->ppParentTsmaSubplans = &pWindow->pTsmaSubplans;
} else {
ASSERT(nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_AGG);
SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent;
pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs;
pTsmaOptCtx->ppParentTsmaSubplans = &pAgg->pTsmaSubplans;
}
pTsmaOptCtx->pUsefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma));
pTsmaOptCtx->pUsedTsmas = taosArrayInit(3, sizeof(STSMAOptUsefulTsma));
@ -5878,8 +5878,6 @@ static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) {
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) {
if (!pTsmaOptCtx->queryInterval) return true;
// TODO tsma save tsmaInterval in table precision to avoid convertions
// TODO tsma save the right unit
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0;
@ -5893,7 +5891,6 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
taosArrayClear(pTsmaScanCols);
FOREACH(pNode, pQueryFuncs) {
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
// TODO tsma handle _wstart
if (fmIsPseudoColumnFunc(pQueryFunc->funcId) || fmIsGroupKeyFunc(pQueryFunc->funcId)) continue;
if (nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
failed = true;
@ -5903,7 +5900,6 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
found = false;
int32_t notMyStateFuncId = -1;
// iterate funcs
// TODO tsma if func is count, skip checking cols, test count(*)
for (int32_t i = 0; i < pTsmaFuncs->size; i++) {
STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i);
if (pTsmaFuncInfo->funcId == notMyStateFuncId) continue;
@ -5979,7 +5975,6 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
continue;
}
// filter with interval
// TODO tsma unit not right
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
continue;
}
@ -5995,7 +5990,6 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
taosArrayPush(pTsmaOptCtx->pUsefulTsmas, &usefulTsma);
}
if (pTsmaScanCols) taosArrayDestroy(pTsmaScanCols);
// TODO tsma filter smaller tsmas that not aligned with the biggest tsma
return TSDB_CODE_SUCCESS;
}
@ -6035,8 +6029,7 @@ static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAIn
pInterval->precision = precision;
}
// TODO tsma refactor, remove some params
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange, uint32_t tsmaStartIdx) {
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange) {
bool needTailWindow = false;
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
int64_t winSkey = TSKEY_MIN, winEkey = TSKEY_MAX;
@ -6046,7 +6039,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
SInterval interval;
STimeWindow scanRange = *pScanRange;
const SInterval* pInterval = pTsmaOptCtx->queryInterval;
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx);
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, 0);
const STableTSMAInfo* pTsma = pUsefulTsma->pTsma;
if (pScanRange->ekey <= pScanRange->skey) return;
@ -6076,7 +6069,6 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
isEkeyAlignedWithTsma = ((pScanRange->ekey + 1 - startOfEkeyFirstWin) % tsmaInterval == 0);
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
needTailWindow = true;
// TODO tsma add some notes
}
}
@ -6086,7 +6078,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
scanRange.ekey,
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 1, pInterval->intervalUnit, pTsmaOptCtx->precision) - 1);
const STSMAOptUsefulTsma* pTsmaFound =
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, scanRange.skey - startOfSkeyFirstWin,
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, 1, scanRange.skey - startOfSkeyFirstWin,
(scanRange.ekey + 1 - startOfSkeyFirstWin), pTsmaOptCtx->precision);
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
.scanRange = scanRange,
@ -6111,7 +6103,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
scanRange.skey = startOfEkeyFirstWin;
scanRange.ekey = pScanRange->ekey;
const STSMAOptUsefulTsma* pTsmaFound =
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, scanRange.skey - startOfEkeyFirstWin,
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, 1, scanRange.skey - startOfEkeyFirstWin,
scanRange.ekey + 1 - startOfEkeyFirstWin, pTsmaOptCtx->precision);
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
.scanRange = scanRange,
@ -6137,7 +6129,6 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
const int32_t* idx = taosArrayGet(pTsma->pTsmaScanCols, i);
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (pCol) {
// TODO tsma why 2?
pCol->colId = *idx + 2;
pCol->tableType = TSDB_SUPER_TABLE;
pCol->tableId = pTsma->targetTbUid;
@ -6195,16 +6186,16 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN
if (pTsma && code == TSDB_CODE_SUCCESS) {
// TODO tsma test child tbname too long
// if with tsma, we replace func tbname with substr(tbname, 34)
// if with tsma, we replace func tbname with substr(tbname, TSMA_RES_CTB_PREFIX_LEN)
pRewrittenFunc->funcId = fmGetFuncId("substr");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
pValue->node.resType.type = TSDB_DATA_TYPE_INT;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
pValue->literal = taosMemoryCalloc(1, 16);
pValue->datum.i = 34;
pValue->datum.i = TSMA_RES_CTB_PREFIX_LEN + 1;
if (!pValue->literal) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
sprintf(pValue->literal, "%d", 34);
sprintf(pValue->literal, "%d", TSMA_RES_CTB_PREFIX_LEN + 1);
code = nodesListMakeAppend(&pRewrittenFunc->pParameterList, *pTbNameNode);
}
if (code == TSDB_CODE_SUCCESS) {
@ -6321,7 +6312,6 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pNewScan->pScanCols);
// normal cols
// TODO tsma last(ts), maybe i should put pk col after normal cols, if no pk col, then add it
pNewScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
if (!pNewScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
}
@ -6359,7 +6349,6 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
}
}
} else {
// TODO tsma rewrite tagcond?
FOREACH(pNode, pNewScan->pGroupTags) {
// rewrite tbname recursively
struct TsmaOptRewriteCtx ctx = {
@ -6397,7 +6386,7 @@ static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut)
return code;
}
static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan,
static int32_t tsmaOptRewriteParent(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan,
const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0;
SColumnNode* pColNode;
@ -6442,7 +6431,6 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
pColNode->node.resType = pPartial->node.resType;
// currently we assume that the first parameter must be the scan column
nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead);
// TODO tsma STRICT
nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode));
nodesDestroyNode((SNode*)pPartial);
@ -6467,105 +6455,12 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
return code;
}
static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan,
const STSMAOptUsefulTsma* pTsma) {
SNode * pStateFuncNode, *pAggFuncNode;
SColumnNode* pColNode;
SListCell* pScanListCell = NULL;
int32_t code = 0;
SNodeList* pAggStateFuncs = NULL;
SNodeList* pAggFuncs = NULL;
SWindowLogicNode* pWindow = NULL;
SAggLogicNode* pAgg = NULL;
bool isFirstMergeNode = pTsmaOptCtx->pScan == pScan;
bool hasWStart = false;
if (nodeType(pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
pWindow = (SWindowLogicNode*)pParent;
pAggFuncs = pWindow->pFuncs;
} else {
pAgg = (SAggLogicNode*)pParent;
pAggFuncs = pAgg->pAggFuncs;
}
if (isFirstMergeNode) {
pAggStateFuncs = nodesCloneList(pAggFuncs);
if (!pAggStateFuncs) return TSDB_CODE_OUT_OF_MEMORY;
} else {
pAggStateFuncs = pAggFuncs;
}
code = fmCreateStateFuncs(pAggStateFuncs);
if (code) return code;
pScanListCell = pScan->pScanCols->pHead;
FORBOTH(pStateFuncNode, pAggStateFuncs, pAggFuncNode, pAggFuncs) {
SFunctionNode* pStateFunc = (SFunctionNode*)pStateFuncNode;
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pAggFuncNode, tsmaOptNodeRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
} else {
REPLACE_LIST2_NODE(pAggFuncNode);
}
continue;
} else if (fmIsPseudoColumnFunc(pAggFunc->funcId)) {
if (pAggFunc->funcType == FUNCTION_TYPE_WSTART) hasWStart = true;
continue;
}
pColNode = (SColumnNode*)pScanListCell->pNode;
pScanListCell = pScanListCell->pNext;
pColNode->node.resType = pStateFunc->node.resType;
nodesDestroyList(pAggFunc->pParameterList);
code = nodesListMakeStrictAppend(&pAggFunc->pParameterList, nodesCloneNode((SNode*)pColNode));
if (code) break;
}
if (code == TSDB_CODE_SUCCESS) code = fmCreateStateMergeFuncs(pAggFuncs);
if (pAggFuncs != pAggStateFuncs) nodesDestroyList(pAggStateFuncs);
if (code == TSDB_CODE_SUCCESS && pWindow) {
SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode;
assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
nodesDestroyNode(pWindow->pTspk);
pWindow->pTspk = nodesCloneNode((SNode*)pCol);
if (!hasWStart && !isFirstMergeNode) {
SFunctionNode* pWStart = NULL;
code = tsmaOptCreateWStart(pWindow->node.precision, &pWStart);
if (TSDB_CODE_SUCCESS == code) {
nodesListAppend(pAggFuncs, (SNode*)pWStart);
}
}
}
if (code == TSDB_CODE_SUCCESS && pWindow) {
nodesDestroyList(pWindow->node.pTargets);
code = createColumnByRewriteExprs(pAggFuncs, &pWindow->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS && pAgg) {
nodesDestroyList(pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAggFuncs, &pAgg->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
return code;
}
static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
int32_t code = 0;
const STSMAOptUsefulTsma* pTsma = NULL;
SNodeList* pAggFuncs = NULL;
bool hasSubPlan = false;
// TODO tsma if no used tsmas skip generating plans
for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) {
STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
if (!pTsma->pTsma) continue;
@ -6605,7 +6500,7 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
SScanLogicNode* pScan = (SScanLogicNode*)pParent->pChildren->pHead->pNode;
code = tsmaOptRewriteScan(pTsmaOptCtx, pScan, pTsma);
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
code = tsmaOptRevisePlan2(pTsmaOptCtx, pParent, pScan, pTsma);
code = tsmaOptRewriteParent(pTsmaOptCtx, pParent, pScan, pTsma);
}
}
@ -6614,7 +6509,7 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
pTsmaOptCtx->pScan->needSplit = hasSubPlan;
code = tsmaOptRewriteScan(pTsmaOptCtx, pTsmaOptCtx->pScan, pTsma);
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
code = tsmaOptRevisePlan2(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma);
code = tsmaOptRewriteParent(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma);
}
}
@ -6650,7 +6545,7 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
// 2. sort useful tsmas with interval
taosArraySort(tsmaOptCtx.pUsefulTsmas, tsmaInfoCompWithIntervalDesc);
// 3. split windows
tsmaOptSplitWindows(&tsmaOptCtx, tsmaOptCtx.pTimeRange, 0);
tsmaOptSplitWindows(&tsmaOptCtx, tsmaOptCtx.pTimeRange);
if (tsmaOptIsUsingTsmas(&tsmaOptCtx)) {
// 4. create logic plan
code = tsmaOptGeneratePlan(&tsmaOptCtx);
@ -6660,14 +6555,15 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
SLogicSubplan* pSubplan = tsmaOptCtx.generatedSubPlans[i];
if (!pSubplan) continue;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
nodesListMakeAppend(&pLogicSubplan->pTsmaChildren, (SNode*)pSubplan);
nodesListMakeAppend(tsmaOptCtx.ppParentTsmaSubplans, (SNode*)pSubplan);
}
pCxt->optimized = true;
}
}
}
}
pScan->pTsmas = NULL;
clearTSMAOptCtx(&tsmaOptCtx);
// TODO tsma if any error occured, we should eat the error, skip the optimization, query with original table
return code;
}

View File

@ -155,7 +155,8 @@ static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
}
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
return ((SMergeLogicNode*)pLogicNode)->srcGroupId == groupId;
return ((SMergeLogicNode*)pLogicNode)->srcGroupId <= groupId &&
((SMergeLogicNode*)pLogicNode)->srcEndGroupId >= groupId;
}
SNode* pChild;
@ -749,8 +750,9 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
if (code == TSDB_CODE_SUCCESS) {
SNode* pNode;
SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
if (pInfo->pSubplan->pTsmaChildren && LIST_LENGTH(pInfo->pSubplan->pTsmaChildren) > 0) {
FOREACH(pNode, pInfo->pSubplan->pTsmaChildren) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
if (LIST_LENGTH(pWindow->pTsmaSubplans) > 0) {
FOREACH(pNode, pWindow->pTsmaSubplans) {
++(pCxt->groupId);
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
pSubplan->id.groupId = pCxt->groupId;
@ -763,7 +765,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
pSubplan->pNode = pPartWindow;
}
}
code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pInfo->pSubplan->pTsmaChildren);
code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pWindow->pTsmaSubplans);
pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
}
pMerge->srcEndGroupId = pCxt->groupId;
@ -984,7 +986,8 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) && (LIST_LENGTH(pInfo->pSubplan->pTsmaChildren) == 0)) {
if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
(LIST_LENGTH(((SWindowLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) == 0)) {
return stbSplSplitWindowForPartTable(pCxt, pInfo);
} else {
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
@ -1165,8 +1168,9 @@ static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SS
if (code == TSDB_CODE_SUCCESS) {
SNode* pNode;
if (pInfo->pSubplan->pTsmaChildren && LIST_LENGTH(pInfo->pSubplan->pTsmaChildren) >0) {
FOREACH(pNode, pInfo->pSubplan->pTsmaChildren) {
SAggLogicNode* pAgg = (SAggLogicNode*)pInfo->pSplitNode;
if (LIST_LENGTH(pAgg->pTsmaSubplans) > 0) {
FOREACH(pNode, pAgg->pTsmaSubplans) {
++(pCxt->groupId);
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
pSubplan->id.groupId = pCxt->groupId;
@ -1178,7 +1182,7 @@ static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SS
nodesDestroyNode((SNode*)pSubplan->pNode);
pSubplan->pNode = pPartAgg;
}
code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pInfo->pSubplan->pTsmaChildren);
code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pAgg->pTsmaSubplans);
pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
}
pMergeNode->srcEndGroupId = pCxt->groupId;
@ -1203,7 +1207,7 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit
if (TSDB_CODE_SUCCESS == code) {
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO tsma test slimit
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
else {
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
}
@ -1227,7 +1231,7 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit
}
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (LIST_LENGTH(pInfo->pSubplan->pTsmaChildren) > 0) {
if (LIST_LENGTH(((SAggLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) > 0) {
return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo);
}
if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {

View File

@ -65,7 +65,7 @@ class TDTestCase:
tdSql.query('select count(*),db_name, stable_name from information_schema.ins_tables group by db_name, stable_name;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 30)
tdSql.checkData(0, 0, 31)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 3)
@ -77,7 +77,7 @@ class TDTestCase:
tdSql.query('select count(1) v,db_name, stable_name from information_schema.ins_tables group by db_name, stable_name order by v desc;')
tdSql.checkRows(3)
tdSql.checkData(0, 0, 30)
tdSql.checkData(0, 0, 31)
tdSql.checkData(0, 1, 'information_schema')
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 0, 5)
@ -93,7 +93,7 @@ class TDTestCase:
tdSql.checkData(1, 1, 'performance_schema')
tdSql.checkData(0, 0, 3)
tdSql.checkData(0, 1, 'tbl_count')
tdSql.checkData(2, 0, 30)
tdSql.checkData(2, 0, 31)
tdSql.checkData(2, 1, 'information_schema')
tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'")
@ -106,7 +106,7 @@ class TDTestCase:
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 38)
tdSql.checkData(0, 0, 39)
tdSql.execute('create table stba (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
@ -189,7 +189,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 1, 'performance_schema')
tdSql.checkData(2, 2, None)
tdSql.checkData(3, 0, 30)
tdSql.checkData(3, 0, 31)
tdSql.checkData(3, 1, 'information_schema')
tdSql.checkData(3, 2, None)
@ -204,7 +204,7 @@ class TDTestCase:
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 1, 'performance_schema')
tdSql.checkData(2, 2, None)
tdSql.checkData(3, 0, 30)
tdSql.checkData(3, 0, 31)
tdSql.checkData(3, 1, 'information_schema')
tdSql.checkData(3, 2, None)
@ -215,7 +215,7 @@ class TDTestCase:
tdSql.checkData(0, 1, 'tbl_count')
tdSql.checkData(1, 0, 5)
tdSql.checkData(1, 1, 'performance_schema')
tdSql.checkData(2, 0, 30)
tdSql.checkData(2, 0, 31)
tdSql.checkData(2, 1, 'information_schema')
tdSql.query("select count(*) from information_schema.ins_tables where db_name='tbl_count'")
@ -228,7 +228,7 @@ class TDTestCase:
tdSql.query('select count(*) from information_schema.ins_tables')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 39)
tdSql.checkData(0, 0, 40)
tdSql.execute('drop database tbl_count')

View File

@ -58,7 +58,7 @@ endi
sql select tbname from information_schema.ins_tables;
print $rows $data00
if $rows != 39 then
if $rows != 40 then
return -1
endi
if $data00 != @ins_tables@ then

View File

@ -53,7 +53,7 @@ sql select stable_name,count(table_name) from information_schema.ins_tables grou
if $rows != 3 then
return -1
endi
if $data01 != 36 then
if $data01 != 37 then
return -1
endi
if $data11 != 10 then
@ -72,7 +72,7 @@ endi
if $data11 != 5 then
return -1
endi
if $data21 != 30 then
if $data21 != 31 then
return -1
endi
if $data31 != 5 then

View File

@ -47,7 +47,7 @@ class TDTestCase:
'col12': f'binary({self.binary_length})',
'col13': f'nchar({self.nchar_length})'
}
self.tbnum = 20
self.tbnum = 21
self.rowNum = 10
self.tag_dict = {
't0':'int',
@ -61,7 +61,7 @@ class TDTestCase:
self.ins_list = ['ins_dnodes','ins_mnodes','ins_qnodes','ins_snodes','ins_cluster','ins_databases','ins_functions',\
'ins_indexes','ins_stables','ins_tables','ins_tags','ins_columns','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\
'ins_topics','ins_subscriptions','ins_streams','ins_stream_tasks','ins_vnodes','ins_user_privileges','ins_views',
'ins_compacts', 'ins_compact_details', 'ins_grants_full','ins_grants_logs', 'ins_machines', 'ins_arbgroups']
'ins_compacts', 'ins_compact_details', 'ins_grants_full','ins_grants_logs', 'ins_machines', 'ins_arbgroups', 'ins_tsmas']
self.perf_list = ['perf_connections','perf_queries','perf_consumers','perf_trans','perf_apps']
def insert_data(self,column_dict,tbname,row_num):
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)

View File

@ -118,6 +118,15 @@ class TSMAQCBuilder:
used_tsma.is_tsma_ = False
self.qc_.used_tsmas.append(used_tsma)
return self
def should_query_with_tsma_ctb(self, tb_name: str, ts_begin: str = UsedTsma.TS_MIN, ts_end: str = UsedTsma.TS_MAX) -> 'TSMAQCBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tb_name
used_tsma.time_range_start = self.to_timestamp(ts_begin)
used_tsma.time_range_end = self.to_timestamp(ts_end)
used_tsma.is_tsma_ = True
self.qc_.used_tsmas.append(used_tsma)
return self
def ignore_query_table(self):
self.qc_.ignore_tsma_check_ = True
@ -567,7 +576,7 @@ class TSMATestSQLGenerator:
class TDTestCase:
updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 1}
updatecfgDict = {'debugFlag': 143, 'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4}
def __init__(self):
self.vgroups = 4
@ -755,8 +764,7 @@ class TDTestCase:
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()
self.test_recursive_tsma()
# self.test_query_with_drop_tsma()
# self.test_union()
self.test_union()
self.test_query_child_table()
self.test_skip_tsma_hint()
self.test_long_tsma_name()
@ -764,6 +772,31 @@ class TDTestCase:
self.test_add_tag_col()
self.test_modify_col_name_value()
def test_union(self):
ctxs = []
sql = 'select avg(c1) from meters union select avg(c1) from norm_tb'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('d2f2c89f2b3378a2a48b4cadf9c3f927_norm_tb').get_qc()
ctxs.append(ctx)
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000" union select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
ctxs.append(TSMAQCBuilder().with_sql(sql)
.should_query_with_tsma('tsma2', '2018-09-17 09:00:00', '2018-09-17 09:59:59:999')
.should_query_with_table("meters", '2018-09-17 10:00:00', '2018-09-17 10:00:00')
.should_query_with_table('meters', '2018-09-17 09:00:00.200', '2018-09-17 09:29:59:999')
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999')
.should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.800').get_qc())
self.check(ctxs)
tdSql.execute('create database db2')
tdSql.execute('use db2')
tdSql.execute('create table norm_tb(ts timestamp, c2 int)')
tdSql.execute('insert into norm_tb values(now, 1)')
tdSql.execute('insert into norm_tb values(now, 2)')
self.create_tsma('tsma_db2_norm_t', 'db2', 'norm_tb', ['avg(c2)', 'last(ts)'], '10m')
sql = 'select avg(c1) from test.meters union select avg(c2) from norm_tb'
self.check([TSMAQCBuilder().with_sql(sql).should_query_with_tsma('tsma2').should_query_with_tsma_ctb('e2d730bfc1242321c58c9ab7590ac060_norm_tb').get_qc()])
tdSql.execute('drop database db2')
tdSql.execute('use test')
def test_modify_col_name_value(self):
tdSql.execute('alter table norm_tb rename column c1 c1_new')
sql = 'select avg(c1_new) from norm_tb'
@ -791,7 +824,7 @@ class TDTestCase:
def test_long_tsma_name(self):
name = self.generate_random_string(178)
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)']
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)', 'last(ts)']
self.create_tsma(name, 'test', 'meters', tsma_func_list, '55m')
sql = 'select last(c5), spread(c2) from meters interval(55m)'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(name).get_qc()
@ -835,7 +868,7 @@ class TDTestCase:
def test_recursive_tsma(self):
tdSql.execute('drop tsma tsma2')
tsma_func_list = ['avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)']
tsma_func_list = ['last(ts)', 'avg(c2)', 'avg(c3)', 'min(c4)', 'max(c3)', 'sum(c2)', 'count(ts)', 'count(c2)', 'first(c5)', 'last(c5)', 'spread(c2)', 'stddev(c3)', 'hyperloglog(c5)']
select_func_list: List[str] = tsma_func_list.copy()
select_func_list.append('count(*)')
self.create_tsma('tsma3', 'test', 'meters', tsma_func_list, '5m')
@ -861,8 +894,13 @@ class TDTestCase:
tdSql.execute('drop tsma tsma6')
tdSql.execute('drop tsma tsma4')
tdSql.execute('drop tsma tsma3')
self.create_tsma('tsma2', 'test', 'meters', [
'avg(c1)', 'avg(c2)'], '30m')
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m')
# test query with dropped tsma tsma4 and tsma6
sql = 'select avg(c2), "tsma2" from meters'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
'tsma2', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc()
self.check([ctx])
def test_query_with_tsma_interval(self):
self.check(self.test_query_with_tsma_interval_possibly_partition())
@ -897,7 +935,7 @@ class TDTestCase:
).ignore_res_order(sql_generator.can_ignore_res_order()).get_qc())
return ctxs
def test_query_with_tsma_interval_possibly_partition(self) -> List[TSMAQueryContext]:
def test_query_with_tsma_interval_possibly_partition(self):
ctxs: List[TSMAQueryContext] = []
sql = 'select avg(c1), avg(c2) from meters interval(5m)'
ctxs.append(TSMAQCBuilder().with_sql(sql)
@ -1147,7 +1185,8 @@ class TDTestCase:
tdSql.query('show tables like "%tsma%"')
tdSql.checkRows(0)
# TODO test drop stream
# test drop stream
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
tdSql.execute('drop database test', queryTimes=1)
self.init_data()