find TODO
This commit is contained in:
parent
d1101a0325
commit
a2cbbba24d
|
@ -950,7 +950,6 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
|
||||||
if (tDecodeI64(&decoder, &pReq->normSourceTbUid) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->normSourceTbUid) < 0) return -1;
|
||||||
|
|
||||||
int32_t numOfVgVer;
|
int32_t numOfVgVer;
|
||||||
// TODO extract method decode vgVerList
|
|
||||||
if (tDecodeI32(&decoder, &numOfVgVer) < 0) return -1;
|
if (tDecodeI32(&decoder, &numOfVgVer) < 0) return -1;
|
||||||
if (numOfVgVer > 0) {
|
if (numOfVgVer > 0) {
|
||||||
pReq->pVgroupVerList = taosArrayInit(numOfVgVer, sizeof(SVgroupVer));
|
pReq->pVgroupVerList = taosArrayInit(numOfVgVer, sizeof(SVgroupVer));
|
||||||
|
@ -10069,7 +10068,6 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
|
||||||
if (!pTsmaInfo->pFuncs) return -1;
|
if (!pTsmaInfo->pFuncs) return -1;
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STableTSMAFuncInfo funcInfo = {0};
|
STableTSMAFuncInfo funcInfo = {0};
|
||||||
// TODO free the array when decode failed
|
|
||||||
if (tDecodeI32(pDecoder, &funcInfo.funcId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &funcInfo.funcId) < 0) return -1;
|
||||||
if (tDecodeI16(pDecoder, &funcInfo.colId) < 0) return -1;
|
if (tDecodeI16(pDecoder, &funcInfo.colId) < 0) return -1;
|
||||||
if (!taosArrayPush(pTsmaInfo->pFuncs, &funcInfo)) return -1;
|
if (!taosArrayPush(pTsmaInfo->pFuncs, &funcInfo)) return -1;
|
||||||
|
@ -10123,7 +10121,7 @@ static int32_t tDecodeTableTSMAInfoRsp(SDecoder* pDecoder, STableTSMAInfoRsp* pR
|
||||||
pRsp->pTsmas = taosArrayInit(size, POINTER_BYTES);
|
pRsp->pTsmas = taosArrayInit(size, POINTER_BYTES);
|
||||||
if (!pRsp->pTsmas) return -1;
|
if (!pRsp->pTsmas) return -1;
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
// TODO add a test case when decode failed, to see if the array is freed
|
// TODO tsma add a test case when decode failed, to see if the array is freed
|
||||||
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
|
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
|
||||||
if (!pTsma) return -1;
|
if (!pTsma) return -1;
|
||||||
taosArrayPush(pRsp->pTsmas, &pTsma);
|
taosArrayPush(pRsp->pTsmas, &pTsma);
|
||||||
|
|
|
@ -1452,7 +1452,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
||||||
pCxt->pCreateStreamReq->igUpdate = 0;
|
pCxt->pCreateStreamReq->igUpdate = 0;
|
||||||
pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
|
pCxt->pCreateStreamReq->lastTs = pCxt->pCreateSmaReq->lastTs;
|
||||||
pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
|
pCxt->pCreateStreamReq->smaId = pCxt->pSma->uid;
|
||||||
//TODO remove this log
|
//TODO tsma remove this log
|
||||||
mDebug("tsma create stream with last ts: %" PRId64 "vgversion size: %d", pCxt->pCreateSmaReq->lastTs,
|
mDebug("tsma create stream with last ts: %" PRId64 "vgversion size: %d", pCxt->pCreateSmaReq->lastTs,
|
||||||
pCxt->pCreateStreamReq->pVgroupVerList ? pCxt->pCreateStreamReq->pVgroupVerList->size : 0);
|
pCxt->pCreateStreamReq->pVgroupVerList ? pCxt->pCreateStreamReq->pVgroupVerList->size : 0);
|
||||||
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
|
pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast);
|
||||||
|
@ -1476,7 +1476,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
|
||||||
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
|
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
|
||||||
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
|
||||||
pCxt->pDropStreamReq->igNotExists = false;
|
pCxt->pDropStreamReq->igNotExists = false;
|
||||||
// TODO fill sql
|
// TODO tsma fill sql
|
||||||
pCxt->pDropStreamReq->sql = strdup(pCxt->pDropSmaReq->name);
|
pCxt->pDropStreamReq->sql = strdup(pCxt->pDropSmaReq->name);
|
||||||
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
|
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
|
||||||
}
|
}
|
||||||
|
@ -1491,10 +1491,10 @@ static int32_t mndCreateTSMASetCreateStreamUndoAction(SMnode* pMnode) {
|
||||||
|
|
||||||
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
// TODO change the action name
|
// TODO tsma change the action name
|
||||||
STransAction redoAction = {0};
|
STransAction redoAction = {0};
|
||||||
STransAction undoAction = {0};
|
STransAction undoAction = {0};
|
||||||
// TODO trans conflicting setting, maybe conflict with myself
|
// TODO tsma trans conflicting setting, maybe conflict with myself
|
||||||
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
|
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
|
||||||
if (!pTrans) {
|
if (!pTrans) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1542,7 +1542,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
||||||
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||||
if (mndTransAppendRedoAction(pTrans, &redoAction) != 0) goto _OVER;
|
if (mndTransAppendRedoAction(pTrans, &redoAction) != 0) goto _OVER;
|
||||||
if (mndTransAppendUndoAction(pTrans, &undoAction) != 0) goto _OVER;
|
if (mndTransAppendUndoAction(pTrans, &undoAction) != 0) goto _OVER;
|
||||||
//TODO add drop stable undo action
|
//TODO tsma add drop stable undo action
|
||||||
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1620,7 +1620,6 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
||||||
if (mndCheckCreateSmaReq(&createReq))
|
if (mndCheckCreateSmaReq(&createReq))
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
||||||
// TODO handle normal table
|
|
||||||
if (createReq.normSourceTbUid == 0) {
|
if (createReq.normSourceTbUid == 0) {
|
||||||
pStb = mndAcquireStb(pMnode, createReq.stb);
|
pStb = mndAcquireStb(pMnode, createReq.stb);
|
||||||
if (!pStb) {
|
if (!pStb) {
|
||||||
|
@ -1744,7 +1743,7 @@ static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
|
||||||
SMDropStbReq dropStbReq = {0};
|
SMDropStbReq dropStbReq = {0};
|
||||||
dropStbReq.igNotExists = false;
|
dropStbReq.igNotExists = false;
|
||||||
tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||||
// TODO fill sql, sql may be freed
|
// TODO tsma fill sql, sql may be freed
|
||||||
dropStbReq.sql = "drop";
|
dropStbReq.sql = "drop";
|
||||||
dropStbReq.sqlLen = 5;
|
dropStbReq.sqlLen = 5;
|
||||||
|
|
||||||
|
@ -1913,7 +1912,7 @@ static int32_t mndRetrieveTSMA(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
|
colDataSetVal(pColInfo, numOfRows, (const char*)(&pSma->createdTime), false);
|
||||||
|
|
||||||
// interval
|
// interval
|
||||||
// TODO replace 64
|
// TODO tsma replace 64
|
||||||
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
char interval[64 + VARSTR_HEADER_SIZE] = {0};
|
||||||
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
|
SDbObj* pSrcDb = mndAcquireDb(pMnode, pSma->db);
|
||||||
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
int32_t len = snprintf(interval + VARSTR_HEADER_SIZE, 64, "%" PRId64 "%c", pSma->interval,
|
||||||
|
@ -2053,7 +2052,7 @@ static int32_t mndGetDeepestBaseForTsma(SMnode* pMnode, SSmaObj* pSma, SSmaObj**
|
||||||
return TSDB_CODE_MND_SMA_NOT_EXIST;
|
return TSDB_CODE_MND_SMA_NOT_EXIST;
|
||||||
}
|
}
|
||||||
while (pRecursiveTsma->baseSmaName[0]) {
|
while (pRecursiveTsma->baseSmaName[0]) {
|
||||||
// TODO test 2 level recursive tsma
|
// TODO tsma test 2 level recursive tsma
|
||||||
SSmaObj* pTmpSma = pRecursiveTsma;
|
SSmaObj* pTmpSma = pRecursiveTsma;
|
||||||
pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
|
pRecursiveTsma = mndAcquireSma(pMnode, pTmpSma->baseSmaName);
|
||||||
if (!pRecursiveTsma) {
|
if (!pRecursiveTsma) {
|
||||||
|
|
|
@ -562,7 +562,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
||||||
latest = pReader->pHead->head.ingestTs;
|
latest = pReader->pHead->head.ingestTs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO remove log
|
// TODO tsma remove log
|
||||||
tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64
|
tqInfo("------ver: %" PRId64 " fhFinished: %d max: %" PRId64 " cur: %" PRId64 " latest: %" PRId64
|
||||||
"lastst-cur %" PRId64,
|
"lastst-cur %" PRId64,
|
||||||
ver, *fhFinished, verRange.maxVer, cur, latest, latest - cur);
|
ver, *fhFinished, verRange.maxVer, cur, latest, latest - cur);
|
||||||
|
|
|
@ -301,7 +301,7 @@ typedef STableTSMAInfo STSMACache;
|
||||||
typedef struct SCtgTbCache {
|
typedef struct SCtgTbCache {
|
||||||
SRWLatch metaLock;
|
SRWLatch metaLock;
|
||||||
SRWLatch indexLock;
|
SRWLatch indexLock;
|
||||||
SRWLatch tsmaLock;// TODO remove?
|
SRWLatch tsmaLock;// TODO tsma remove?
|
||||||
STableMeta* pMeta;
|
STableMeta* pMeta;
|
||||||
STableIndex* pIndex;
|
STableIndex* pIndex;
|
||||||
} SCtgTbCache;
|
} SCtgTbCache;
|
||||||
|
|
|
@ -1832,7 +1832,7 @@ _return:
|
||||||
|
|
||||||
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
|
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
|
||||||
STableTSMAInfoRsp tsmasRsp = {0};
|
STableTSMAInfoRsp tsmasRsp = {0};
|
||||||
//TODO get from cache first
|
//TODO tsma get from cache first
|
||||||
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA);
|
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL, TDMT_MND_GET_TABLE_TSMA);
|
||||||
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
|
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
|
@ -785,7 +785,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tbTsmaNum > 0) {
|
if (tbTsmaNum > 0) {
|
||||||
// TODO when create recursive tsma, avoid get tb tsma task
|
// TODO tsma when create recursive tsma, avoid get tb tsma task
|
||||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL));
|
||||||
}
|
}
|
||||||
if (tsmaNum > 0) {
|
if (tsmaNum > 0) {
|
||||||
|
@ -2660,7 +2660,7 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
|
||||||
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
||||||
int32_t fetchIdx = 0, baseResIdx = 0;
|
int32_t fetchIdx = 0, baseResIdx = 0;
|
||||||
|
|
||||||
// TODO test multi tables from multi dbs
|
// TODO tsma test multi tables from multi dbs
|
||||||
for (int32_t idx = 0; idx < dbNum; ++idx) {
|
for (int32_t idx = 0; idx < dbNum; ++idx) {
|
||||||
STablesReq* pReq = taosArrayGet(pCtx->pNames, idx);
|
STablesReq* pReq = taosArrayGet(pCtx->pNames, idx);
|
||||||
CTG_ERR_RET(ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables));
|
CTG_ERR_RET(ctgGetTbTSMAFromCache(pCtg, pCtx, idx, &fetchIdx, baseResIdx, pReq->pTables));
|
||||||
|
@ -2881,7 +2881,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_GET_STREAM_PROGRESS: {
|
case TDMT_VND_GET_STREAM_PROGRESS: {
|
||||||
// update progress into res
|
// update progress into res
|
||||||
// TODO pack all streams into one req, and handle all stream rsps together
|
// TODO tsma pack all streams into one req, and handle all stream rsps together
|
||||||
STableTSMAInfoRsp* pTsmasRsp = pRes->pRes;
|
STableTSMAInfoRsp* pTsmasRsp = pRes->pRes;
|
||||||
SArray* pTsmas = pTsmasRsp->pTsmas;
|
SArray* pTsmas = pTsmasRsp->pTsmas;
|
||||||
SStreamProgressRsp* pRsp = pMsgCtx->out;
|
SStreamProgressRsp* pRsp = pMsgCtx->out;
|
||||||
|
@ -2939,7 +2939,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
||||||
ctgRemoveTbMetaFromCache(pCtg, pTbName, false);
|
ctgRemoveTbMetaFromCache(pCtg, pTbName, false);
|
||||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||||
}
|
}
|
||||||
// TODO add tb meta to cache
|
// TODO tsma add tb meta to cache
|
||||||
if (META_TYPE_BOTH_TABLE == pOut->metaType) {
|
if (META_TYPE_BOTH_TABLE == pOut->metaType) {
|
||||||
// rewrite tsma fetch table with it's super table name
|
// rewrite tsma fetch table with it's super table name
|
||||||
sprintf(pFetch->tsmaSourceTbName.tname, "%s", pOut->tbName);
|
sprintf(pFetch->tsmaSourceTbName.tname, "%s", pOut->tbName);
|
||||||
|
|
|
@ -3028,7 +3028,7 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq
|
||||||
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
|
ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO TEST normal table
|
// TODO tsma TEST normal table
|
||||||
CTG_ERR_JRET(ctgDropTSMAForTbEnqueue(pCtg, pTableName, syncReq));
|
CTG_ERR_JRET(ctgDropTSMAForTbEnqueue(pCtg, pTableName, syncReq));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
@ -3237,7 +3237,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
||||||
int32_t tbNum = taosArrayGetSize(pList);
|
int32_t tbNum = taosArrayGetSize(pList);
|
||||||
SCtgTbCache * pTbCache = NULL;
|
SCtgTbCache * pTbCache = NULL;
|
||||||
|
|
||||||
// TODO test sys db
|
// TODO tsma test sys db
|
||||||
if (IS_SYS_DBNAME(pName->dbname)) {
|
if (IS_SYS_DBNAME(pName->dbname)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -3247,7 +3247,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
||||||
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache));
|
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache));
|
||||||
if (!dbCache) {
|
if (!dbCache) {
|
||||||
ctgDebug("DB %s not in cache", dbFName);
|
ctgDebug("DB %s not in cache", dbFName);
|
||||||
// TODO test no db cache, select from another db
|
// TODO tsma test no db cache, select from another db
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
|
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
|
||||||
taosArrayPush(pCtx->pResList, &(SMetaData){0});
|
taosArrayPush(pCtx->pResList, &(SMetaData){0});
|
||||||
|
@ -3278,7 +3278,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
||||||
taosHashRelease(dbCache->stbCache, stbName);
|
taosHashRelease(dbCache->stbCache, stbName);
|
||||||
} else {
|
} else {
|
||||||
ctgDebug("stb in db: %s, uid: %" PRId64 " not in cache", dbFName, suid);
|
ctgDebug("stb in db: %s, uid: %" PRId64 " not in cache", dbFName, suid);
|
||||||
// TODO remove flag
|
// TODO tsma remove flag
|
||||||
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
|
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TSMA_SOURCE_TB_META, NULL);
|
||||||
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
|
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
|
||||||
continue;
|
continue;
|
||||||
|
@ -3308,7 +3308,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
||||||
|
|
||||||
CTG_CACHE_HIT_INC(CTG_CI_TBL_TSMA, 1);
|
CTG_CACHE_HIT_INC(CTG_CI_TBL_TSMA, 1);
|
||||||
|
|
||||||
// TODO use construct and destructor pattern
|
// TODO tsma use construct and destructor pattern
|
||||||
STableTSMAInfoRsp *pRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
STableTSMAInfoRsp *pRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||||
if (!pRsp) {
|
if (!pRsp) {
|
||||||
ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
|
ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
|
||||||
|
@ -3321,7 +3321,7 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
SMetaRes res = {0};
|
SMetaRes res = {0};
|
||||||
// TODO if pCache->pTsmas is empty, maybe we should get tsmas from mnode
|
// TODO tsma if pCache->pTsmas is empty, maybe we should get tsmas from mnode
|
||||||
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
|
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
|
||||||
STSMACache *pTsmaOut = NULL;
|
STSMACache *pTsmaOut = NULL;
|
||||||
STSMACache *pTsmaCache = taosArrayGetP(pCache->pTsmas, i);
|
STSMACache *pTsmaCache = taosArrayGetP(pCache->pTsmas, i);
|
||||||
|
@ -3446,7 +3446,7 @@ int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool sync
|
||||||
msg->tbId = pTsma->suid;
|
msg->tbId = pTsma->suid;
|
||||||
msg->tsmaId = pTsma->tsmaId;
|
msg->tsmaId = pTsma->tsmaId;
|
||||||
tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN);
|
tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN);
|
||||||
// TODO use table name len, instead of TSDB_TABLE_FNAME_LEN
|
// TODO tsma use table name len, instead of TSDB_TABLE_FNAME_LEN
|
||||||
tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN);
|
tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN);
|
||||||
tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN);
|
tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN);
|
||||||
|
|
||||||
|
|
|
@ -1566,7 +1566,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO test errors
|
// TODO tsma test errors
|
||||||
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
|
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
|
||||||
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
|
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
|
||||||
void* bInput) {
|
void* bInput) {
|
||||||
|
|
|
@ -2396,7 +2396,7 @@ int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) {
|
uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) {
|
||||||
//TODO
|
//TODO tsma
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2420,7 +2420,7 @@ bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) {
|
||||||
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
|
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
|
||||||
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
|
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
|
||||||
} else {
|
} else {
|
||||||
// TODO remove log
|
// TODO tsma remove log
|
||||||
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
|
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
|
||||||
" passed: %" PRId64,
|
" passed: %" PRId64,
|
||||||
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
|
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
|
||||||
|
|
|
@ -3981,7 +3981,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = stddevPartialFinalize,
|
.finalizeFunc = stddevPartialFinalize,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
//TODO for outer use not only internal
|
//TODO tsma for outer use not only internal
|
||||||
.name = "_avg_state",
|
.name = "_avg_state",
|
||||||
.type = FUNCTION_TYPE_AVG_STATE,
|
.type = FUNCTION_TYPE_AVG_STATE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TSMA_FUNC,
|
||||||
|
|
|
@ -4455,10 +4455,6 @@ static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes);
|
code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
//TODO
|
|
||||||
//code = tjsonAddTArray(SJson *pJson, const char *pName, FToJson func, const SArray *pArray);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4484,9 +4480,6 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) {
|
||||||
code =
|
code =
|
||||||
tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo));
|
tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo));
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
//TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2880,7 +2880,7 @@ SNode* createTSMAOptions(SAstCreateContext* pCxt, SNodeList* pFuncs) {
|
||||||
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
|
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "Out of memory");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
// TODO check non supported funcs somewhere
|
// TODO tsma check non supported funcs somewhere
|
||||||
pOptions->pFuncs = pFuncs;
|
pOptions->pFuncs = pFuncs;
|
||||||
return (SNode*)pOptions;
|
return (SNode*)pOptions;
|
||||||
}
|
}
|
||||||
|
|
|
@ -771,7 +771,7 @@ static int32_t collectMetaKeyFromCreateTSMAStmt(SCollectMetaKeyCxt* pCxt, SCreat
|
||||||
code = reserveTSMAInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
code = reserveTSMAInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
char dstTbName[TSDB_TABLE_NAME_LEN] = {0};
|
char dstTbName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
// TODO check len
|
// TODO tsma check len
|
||||||
sprintf(dstTbName, "%s"TSMA_RES_STB_POSTFIX, pStmt->tableName);
|
sprintf(dstTbName, "%s"TSMA_RES_STB_POSTFIX, pStmt->tableName);
|
||||||
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, dstTbName, pCxt->pMetaCache);
|
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, dstTbName, pCxt->pMetaCache);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -3673,7 +3673,7 @@ static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNo
|
||||||
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
|
STableTSMAInfo* pTsma = taosArrayGetP(pRealTable->pTsmas, i);
|
||||||
SName tsmaTargetTbName = {0};
|
SName tsmaTargetTbName = {0};
|
||||||
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
|
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, "", &tsmaTargetTbName);
|
||||||
int32_t len = snprintf(tsmaTargetTbName.tname, TSDB_TABLE_NAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name); // TODO what if tsma name is too long
|
int32_t len = snprintf(tsmaTargetTbName.tname, TSDB_TABLE_NAME_LEN, "%s.%s", pTsma->dbFName, pTsma->name); // TODO tsma what if tsma name is too long
|
||||||
len = taosCreateMD5Hash(tsmaTargetTbName.tname, len);
|
len = taosCreateMD5Hash(tsmaTargetTbName.tname, len);
|
||||||
sprintf(tsmaTargetTbName.tname + len, "_%s", pRealTable->table.tableName);
|
sprintf(tsmaTargetTbName.tname + len, "_%s", pRealTable->table.tableName);
|
||||||
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
|
collectUseTable(&tsmaTargetTbName, pCxt->pTargetTables);
|
||||||
|
@ -10593,7 +10593,7 @@ static int32_t buildTSMAAstMakeConcatFuncNode(SCreateTSMAStmt* pStmt, SMCreateSm
|
||||||
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
|
code = nodesListMakeStrictAppend(&pSubstrFunc->pParameterList, nodesMakeNode(QUERY_NODE_VALUE));
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode;
|
SValueNode* pV = (SValueNode*)pSubstrFunc->pParameterList->pTail->pNode;
|
||||||
pV->literal = strdup("34"); // TODO define this magic number
|
pV->literal = strdup("34"); // TODO tsma define this magic number
|
||||||
if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pV->literal) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
pV->isDuration = false;
|
pV->isDuration = false;
|
||||||
pV->translate = false;
|
pV->translate = false;
|
||||||
|
@ -10765,7 +10765,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
// TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取.
|
// TODO tsma 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取.
|
||||||
STableTSMAInfo* pRecursiveTsma = NULL;
|
STableTSMAInfo* pRecursiveTsma = NULL;
|
||||||
int32_t numOfCols = 0, numOfTags = 0;
|
int32_t numOfCols = 0, numOfTags = 0;
|
||||||
SSchema * pCols = NULL, *pTags = NULL;
|
SSchema * pCols = NULL, *pTags = NULL;
|
||||||
|
@ -10796,7 +10796,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm
|
||||||
memset(useTbName, 0, sizeof(SName));
|
memset(useTbName, 0, sizeof(SName));
|
||||||
memcpy(pStmt->originalTbName, pRecursiveTsma->tb, TSDB_TABLE_NAME_LEN);
|
memcpy(pStmt->originalTbName, pRecursiveTsma->tb, TSDB_TABLE_NAME_LEN);
|
||||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb);
|
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb);
|
||||||
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array
|
numOfCols = pRecursiveTsma->pUsedCols->size; // TODO tsma merge pUsedCols and pTags with one SSchema array
|
||||||
numOfTags = pRecursiveTsma->pTags->size;
|
numOfTags = pRecursiveTsma->pTags->size;
|
||||||
pCols = pRecursiveTsma->pUsedCols->pData;
|
pCols = pRecursiveTsma->pUsedCols->pData;
|
||||||
pTags = pRecursiveTsma->pTags->pData;
|
pTags = pRecursiveTsma->pTags->pData;
|
||||||
|
|
|
@ -5787,13 +5787,13 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
// TODO may need to replace func conds in having
|
// TODO tsma may need to replace func conds in having
|
||||||
|
|
||||||
assert(pFuncs);
|
assert(pFuncs);
|
||||||
FOREACH(pTmpNode, pFuncs) {
|
FOREACH(pTmpNode, pFuncs) {
|
||||||
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
|
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
|
||||||
// TODO test other pseudo column funcs
|
// TODO tsma test other pseudo column funcs
|
||||||
// TODO test funcs with multi params
|
// TODO tsma test funcs with multi params
|
||||||
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) &&
|
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) &&
|
||||||
!fmIsGroupKeyFunc(pFunc->funcId)) {
|
!fmIsGroupKeyFunc(pFunc->funcId)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -5878,8 +5878,8 @@ static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) {
|
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) {
|
||||||
if (!pTsmaOptCtx->queryInterval) return true;
|
if (!pTsmaOptCtx->queryInterval) return true;
|
||||||
|
|
||||||
// TODO save tsmaInterval in table precision to avoid convertions
|
// TODO tsma save tsmaInterval in table precision to avoid convertions
|
||||||
// TODO save the right unit
|
// TODO tsma save the right unit
|
||||||
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
|
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
|
||||||
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
|
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
|
||||||
bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0;
|
bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0;
|
||||||
|
@ -5893,7 +5893,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
|
||||||
taosArrayClear(pTsmaScanCols);
|
taosArrayClear(pTsmaScanCols);
|
||||||
FOREACH(pNode, pQueryFuncs) {
|
FOREACH(pNode, pQueryFuncs) {
|
||||||
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
|
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
|
||||||
// TODO handle _wstart
|
// TODO tsma handle _wstart
|
||||||
if (fmIsPseudoColumnFunc(pQueryFunc->funcId) || fmIsGroupKeyFunc(pQueryFunc->funcId)) continue;
|
if (fmIsPseudoColumnFunc(pQueryFunc->funcId) || fmIsGroupKeyFunc(pQueryFunc->funcId)) continue;
|
||||||
if (nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
|
if (nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
|
||||||
failed = true;
|
failed = true;
|
||||||
|
@ -5903,7 +5903,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
|
||||||
found = false;
|
found = false;
|
||||||
int32_t notMyStateFuncId = -1;
|
int32_t notMyStateFuncId = -1;
|
||||||
// iterate funcs
|
// iterate funcs
|
||||||
// TODO if func is count, skip checking cols, test count(*)
|
// TODO tsma if func is count, skip checking cols, test count(*)
|
||||||
for (int32_t i = 0; i < pTsmaFuncs->size; i++) {
|
for (int32_t i = 0; i < pTsmaFuncs->size; i++) {
|
||||||
STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i);
|
STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i);
|
||||||
if (pTsmaFuncInfo->funcId == notMyStateFuncId) continue;
|
if (pTsmaFuncInfo->funcId == notMyStateFuncId) continue;
|
||||||
|
@ -5944,7 +5944,7 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// filter with interval
|
// filter with interval
|
||||||
// TODO unit not right
|
// TODO tsma unit not right
|
||||||
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
|
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -5958,7 +5958,7 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
taosArrayPush(pTsmaOptCtx->pUsefulTsmas, &usefulTsma);
|
taosArrayPush(pTsmaOptCtx->pUsefulTsmas, &usefulTsma);
|
||||||
}
|
}
|
||||||
if (pTsmaScanCols) taosArrayDestroy(pTsmaScanCols);
|
if (pTsmaScanCols) taosArrayDestroy(pTsmaScanCols);
|
||||||
// TODO filter smaller tsmas that not aligned with the biggest tsma
|
// TODO tsma filter smaller tsmas that not aligned with the biggest tsma
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5998,7 +5998,7 @@ static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAIn
|
||||||
pInterval->precision = precision;
|
pInterval->precision = precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO refactor, remove some params
|
// TODO tsma refactor, remove some params
|
||||||
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange, uint32_t tsmaStartIdx) {
|
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange, uint32_t tsmaStartIdx) {
|
||||||
bool needTailWindow = false;
|
bool needTailWindow = false;
|
||||||
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
|
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
|
||||||
|
@ -6039,7 +6039,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
|
||||||
isEkeyAlignedWithTsma = ((pScanRange->ekey + 1 - startOfEkeyFirstWin) % tsmaInterval == 0);
|
isEkeyAlignedWithTsma = ((pScanRange->ekey + 1 - startOfEkeyFirstWin) % tsmaInterval == 0);
|
||||||
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
|
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
|
||||||
needTailWindow = true;
|
needTailWindow = true;
|
||||||
// TODO add some notes
|
// TODO tsma add some notes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6100,7 +6100,7 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
|
||||||
const int32_t* idx = taosArrayGet(pTsma->pTsmaScanCols, i);
|
const int32_t* idx = taosArrayGet(pTsma->pTsmaScanCols, i);
|
||||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
if (pCol) {
|
if (pCol) {
|
||||||
// TODO why 2?
|
// TODO tsma why 2?
|
||||||
pCol->colId = *idx + 2;
|
pCol->colId = *idx + 2;
|
||||||
pCol->tableType = TSDB_SUPER_TABLE;
|
pCol->tableType = TSDB_SUPER_TABLE;
|
||||||
pCol->tableId = pTsma->targetTbUid;
|
pCol->tableId = pTsma->targetTbUid;
|
||||||
|
@ -6157,7 +6157,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTsma && code == TSDB_CODE_SUCCESS) {
|
if (pTsma && code == TSDB_CODE_SUCCESS) {
|
||||||
// TODO test child tbname too long
|
// TODO tsma test child tbname too long
|
||||||
// if with tsma, we replace func tbname with substr(tbname, 34)
|
// if with tsma, we replace func tbname with substr(tbname, 34)
|
||||||
pRewrittenFunc->funcId = fmGetFuncId("substr");
|
pRewrittenFunc->funcId = fmGetFuncId("substr");
|
||||||
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
|
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "substr");
|
||||||
|
@ -6284,12 +6284,10 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyList(pNewScan->pScanCols);
|
nodesDestroyList(pNewScan->pScanCols);
|
||||||
// normal cols
|
// normal cols
|
||||||
// TODO last(ts), maybe i should put pk col after normal cols, if no pk col, then add it
|
// TODO tsma last(ts), maybe i should put pk col after normal cols, if no pk col, then add it
|
||||||
pNewScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
|
pNewScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
|
||||||
if (!pNewScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pNewScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
if (pNewScan->tableType == TSDB_CHILD_TABLE) { // TODO remvoe it
|
|
||||||
}
|
|
||||||
if (code == TSDB_CODE_SUCCESS && pPkTsCol) {
|
if (code == TSDB_CODE_SUCCESS && pPkTsCol) {
|
||||||
tstrncpy(pPkTsCol->tableName, pTsma->targetTbName, TSDB_TABLE_NAME_LEN);
|
tstrncpy(pPkTsCol->tableName, pTsma->targetTbName, TSDB_TABLE_NAME_LEN);
|
||||||
tstrncpy(pPkTsCol->tableAlias, pTsma->targetTbName, TSDB_TABLE_NAME_LEN);
|
tstrncpy(pPkTsCol->tableAlias, pTsma->targetTbName, TSDB_TABLE_NAME_LEN);
|
||||||
|
@ -6297,7 +6295,6 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
|
||||||
nodesListMakeStrictAppend(&pNewScan->pScanCols, nodesCloneNode((SNode*)pPkTsCol));
|
nodesListMakeStrictAppend(&pNewScan->pScanCols, nodesCloneNode((SNode*)pPkTsCol));
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
// TODO handle child tables
|
|
||||||
pNewScan->stableId = pTsma->pTsma->destTbUid;
|
pNewScan->stableId = pTsma->pTsma->destTbUid;
|
||||||
pNewScan->tableId = pTsma->targetTbUid;
|
pNewScan->tableId = pTsma->targetTbUid;
|
||||||
strcpy(pNewScan->tableName.tname, pTsma->targetTbName);
|
strcpy(pNewScan->tableName.tname, pTsma->targetTbName);
|
||||||
|
@ -6325,7 +6322,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO rewrite tagcond?
|
// TODO tsma rewrite tagcond?
|
||||||
FOREACH(pNode, pNewScan->pGroupTags) {
|
FOREACH(pNode, pNewScan->pGroupTags) {
|
||||||
// rewrite tbname recursively
|
// rewrite tbname recursively
|
||||||
struct TsmaOptRewriteCtx ctx = {
|
struct TsmaOptRewriteCtx ctx = {
|
||||||
|
@ -6408,7 +6405,7 @@ static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
|
||||||
pColNode->node.resType = pPartial->node.resType;
|
pColNode->node.resType = pPartial->node.resType;
|
||||||
// currently we assume that the first parameter must be the scan column
|
// currently we assume that the first parameter must be the scan column
|
||||||
nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead);
|
nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead);
|
||||||
// TODO STRICT
|
// TODO tsma STRICT
|
||||||
nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode));
|
nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode));
|
||||||
|
|
||||||
nodesDestroyNode((SNode*)pPartial);
|
nodesDestroyNode((SNode*)pPartial);
|
||||||
|
@ -6531,7 +6528,7 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
SNodeList* pAggFuncs = NULL;
|
SNodeList* pAggFuncs = NULL;
|
||||||
bool hasSubPlan = false;
|
bool hasSubPlan = false;
|
||||||
|
|
||||||
// TODO if no used tsmas skip generating plans
|
// TODO tsma if no used tsmas skip generating plans
|
||||||
for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) {
|
for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) {
|
||||||
STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
|
STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
|
||||||
if (!pTsma->pTsma) continue;
|
if (!pTsma->pTsma) continue;
|
||||||
|
@ -6630,7 +6627,7 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clearTSMAOptCtx(&tsmaOptCtx);
|
clearTSMAOptCtx(&tsmaOptCtx);
|
||||||
// TODO if any error occured, we should eat the error, skip the optimization, query with original table
|
// TODO tsma if any error occured, we should eat the error, skip the optimization, query with original table
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1197,7 +1197,7 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
|
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
|
||||||
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
|
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
|
||||||
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO test slimit
|
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO tsma test slimit
|
||||||
else {
|
else {
|
||||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
|
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1082,6 +1082,9 @@ class TDTestCase:
|
||||||
tdSql.waitedQuery('show tables like "%t3"', 0, 1)
|
tdSql.waitedQuery('show tables like "%t3"', 0, 1)
|
||||||
tdSql.waitedQuery('show tables like "%t4"', 0, 1)
|
tdSql.waitedQuery('show tables like "%t4"', 0, 1)
|
||||||
|
|
||||||
|
tdSql.query('show tables like "%tsma%"')
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
# TODO test drop stream
|
# TODO test drop stream
|
||||||
|
|
||||||
tdSql.execute('drop database test', queryTimes=1)
|
tdSql.execute('drop database test', queryTimes=1)
|
||||||
|
|
Loading…
Reference in New Issue