tsma optimization
This commit is contained in:
parent
ef47fd57d6
commit
0fd66d7e8a
|
@ -88,6 +88,7 @@ char getPrecisionUnit(int32_t precision);
|
|||
int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision);
|
||||
int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit);
|
||||
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal);
|
||||
int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision);
|
||||
|
||||
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision);
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ typedef struct SMetaData {
|
|||
SArray* pTableTag; // pRes = SArray<STagVal>*
|
||||
SArray* pDnodeList; // pRes = SArray<SEpSet>*
|
||||
SArray* pView; // pRes = SViewMeta*
|
||||
SArray* pTableTsmas; // pRes = SArray<STableTSMAInfo>*
|
||||
SArray* pTableTsmas; // pRes = SArray<STableTSMAInfo*>
|
||||
SMetaRes* pSvrVer; // pRes = char*
|
||||
} SMetaData;
|
||||
|
||||
|
@ -410,6 +410,8 @@ int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma);
|
|||
|
||||
int32_t catalogRemoveTSMA(SCatalog* pCtg, const STableTSMAInfo* pTsma);
|
||||
|
||||
int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
|
||||
|
||||
/**
|
||||
* Destroy catalog and relase all resources
|
||||
*/
|
||||
|
|
|
@ -275,6 +275,7 @@ char* fmGetFuncName(int32_t funcId);
|
|||
|
||||
bool fmIsTSMASupportedFunc(func_id_t funcid);
|
||||
int32_t rewriteFuncsForTSMA(SNodeList* pFuncs);
|
||||
int32_t getFuncId(const char* name);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -169,6 +169,7 @@ int32_t nodesMsgToNode(const char* pStr, int32_t len, SNode** pNode);
|
|||
int32_t nodesNodeToSQL(SNode* pNode, char* buf, int32_t bufSize, int32_t* len);
|
||||
char* nodesGetNameFromColumnNode(SNode* pNode);
|
||||
int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots);
|
||||
void nodesSortList(SNodeList** pList, bool (*)(SNode* pNode1, SNode* pNode2));
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -109,6 +109,7 @@ typedef struct SScanLogicNode {
|
|||
int8_t igExpired;
|
||||
int8_t igCheckUpdate;
|
||||
SArray* pSmaIndexes;
|
||||
SArray* pTsmas;
|
||||
SNodeList* pGroupTags;
|
||||
bool groupSort;
|
||||
SNodeList* pTags; // for create stream
|
||||
|
|
|
@ -198,6 +198,7 @@ typedef struct SRealTableNode {
|
|||
double ratio;
|
||||
SArray* pSmaIndexes;
|
||||
int8_t cacheLastMode;
|
||||
SArray* pTsmas;
|
||||
} SRealTableNode;
|
||||
|
||||
typedef struct STempTableNode {
|
||||
|
|
|
@ -581,7 +581,7 @@ int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
|
||||
int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
|
||||
switch (unit) {
|
||||
case 's':
|
||||
if (val > INT64_MAX / MILLISECOND_PER_SECOND) {
|
||||
|
|
|
@ -1603,6 +1603,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
|
|||
if (mndCheckCreateSmaReq(&createReq))
|
||||
goto _OVER;
|
||||
|
||||
// TODO handle normal table
|
||||
pStb = mndAcquireStb(pMnode, createReq.stb);
|
||||
if (!pStb) {
|
||||
mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
|
||||
|
|
|
@ -1830,6 +1830,46 @@ _return:
|
|||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) {
|
||||
STableTSMAInfoRsp tsmasRsp = {0};
|
||||
int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL);
|
||||
if (code == TSDB_CODE_MND_SMA_NOT_EXIST) {
|
||||
code = 0;
|
||||
goto _return;
|
||||
}
|
||||
CTG_ERR_JRET(code);
|
||||
assert(tsmasRsp.pTsmas);
|
||||
assert(tsmasRsp.pTsmas->size > 0);
|
||||
*ppRes = tsmasRsp.pTsmas;
|
||||
tsmasRsp.pTsmas = NULL;
|
||||
|
||||
for (int32_t i = 0; i < (*ppRes)->size; ++i) {
|
||||
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), false));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
if (tsmasRsp.pTsmas) {
|
||||
tFreeTableTSMAInfoRsp(&tsmasRsp);
|
||||
}
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == pConn || NULL == pTableName || NULL == pRes) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgGetTbTsmas(pCtg, pConn, (SName*)pTableName, pRes));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t catalogClearCache(void) {
|
||||
CTG_API_ENTER_NOLOCK();
|
||||
|
||||
|
|
|
@ -3274,12 +3274,18 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
|||
|
||||
CTG_CACHE_HIT_INC(CTG_CI_TBL_TSMA, 1);
|
||||
|
||||
STableTSMAInfoRsp rsp;
|
||||
rsp.pTsmas = taosArrayInit(pCache->pTsmas->size, POINTER_BYTES);
|
||||
if (!rsp.pTsmas) {
|
||||
// TODO use construct and destructor pattern
|
||||
STableTSMAInfoRsp *pRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
|
||||
if (!pRsp) {
|
||||
ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pRsp->pTsmas = taosArrayInit(pCache->pTsmas->size, POINTER_BYTES);
|
||||
if (!pRsp->pTsmas) {
|
||||
ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
|
||||
taosMemoryFreeClear(pRsp);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
SMetaRes res = {0};
|
||||
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
|
||||
STSMACache *pTsmaOut = NULL;
|
||||
|
@ -3287,12 +3293,13 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
|
|||
code = ctgCloneTbTSMA(pTsmaCache, &pTsmaOut);
|
||||
if (code) {
|
||||
ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
|
||||
tFreeTableTSMAInfoRsp(&rsp);
|
||||
tFreeTableTSMAInfoRsp(pRsp);
|
||||
taosMemoryFreeClear(pRsp);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
taosArrayPush(rsp.pTsmas, &pTsmaOut);
|
||||
taosArrayPush(pRsp->pTsmas, &pTsmaOut);
|
||||
}
|
||||
res.pRes = rsp.pTsmas;
|
||||
res.pRes = pRsp;
|
||||
taosArrayPush(pCtx->pResList, &res);
|
||||
taosHashRelease(dbCache->tsmaCache, pCache);
|
||||
}
|
||||
|
|
|
@ -1485,7 +1485,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* na
|
|||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_TABLE_TSMA;
|
||||
SCtgTask* pTask = tReq->pTask;
|
||||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(name, tbFName);
|
||||
|
|
|
@ -567,3 +567,19 @@ int32_t rewriteFuncsForTSMA(SNodeList* pFuncs) {
|
|||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t getFuncId(const char* name) {
|
||||
if (NULL != gFunMgtService.pFuncNameHashTable) {
|
||||
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name));
|
||||
if (NULL != pVal) {
|
||||
return *(int32_t*)pVal;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
|
||||
if (0 == strcmp(funcMgtBuiltins[i].name, name)) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -4432,6 +4432,10 @@ static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
//TODO
|
||||
//code = tjsonAddTArray(SJson *pJson, const char *pName, FToJson func, const SArray *pArray);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -4457,6 +4461,9 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) {
|
|||
code =
|
||||
tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
//TODO
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2630,3 +2630,70 @@ bool nodesIsTableStar(SNode* pNode) {
|
|||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) &&
|
||||
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||
}
|
||||
|
||||
void nodesSortList(SNodeList** pList, bool (*comp)(SNode* pNode1, SNode* pNode2)) {
|
||||
if ((*pList)->length == 1) return;
|
||||
|
||||
uint32_t inSize = 1;
|
||||
SListCell* pHead = (*pList)->pHead;
|
||||
while (1) {
|
||||
SListCell* p = pHead;
|
||||
pHead = NULL;
|
||||
SListCell* pTail = NULL;
|
||||
|
||||
uint32_t nMerges = 0;
|
||||
while (p) {
|
||||
++nMerges;
|
||||
SListCell* q = p;
|
||||
uint32_t pSize = 0;
|
||||
for (uint32_t i = 0; i < inSize; ++i) {
|
||||
++pSize;
|
||||
q = q->pNext;
|
||||
if (!q) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t qSize = inSize;
|
||||
|
||||
while (pSize > 0 || (qSize > 0 && q)) {
|
||||
SListCell* pCell;
|
||||
if (pSize == 0) {
|
||||
pCell = q;
|
||||
q = q->pNext;
|
||||
--qSize;
|
||||
} else if (qSize == 0 || !q) {
|
||||
pCell = p;
|
||||
p = p->pNext;
|
||||
--pSize;
|
||||
} else if (!comp(q->pNode, p->pNode)) {
|
||||
pCell = p;
|
||||
p = p->pNext;
|
||||
--pSize;
|
||||
} else {
|
||||
pCell = q;
|
||||
q = q->pNext;
|
||||
--qSize;
|
||||
}
|
||||
|
||||
if (pTail) {
|
||||
pTail->pNext = pCell;
|
||||
pCell->pPrev = pTail;
|
||||
} else {
|
||||
pHead = pCell;
|
||||
pHead->pPrev = NULL;
|
||||
}
|
||||
pTail = pCell;
|
||||
}
|
||||
p = q;
|
||||
}
|
||||
pTail->pNext = NULL;
|
||||
|
||||
if (nMerges <= 1) {
|
||||
(*pList)->pHead = pHead;
|
||||
(*pList)->pTail = pTail;
|
||||
return;
|
||||
}
|
||||
inSize *= 2;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,64 @@ TEST(NodesTest, traverseTest) {
|
|||
nodesDestroyNode(pRoot);
|
||||
}
|
||||
|
||||
bool compareValueNode(SNode* pNode1, SNode* pNode2) {
|
||||
SValueNode* p1 = (SValueNode*)pNode1;
|
||||
SValueNode* p2 = (SValueNode*)pNode2;
|
||||
|
||||
return p1->datum.i < p2->datum.i;
|
||||
}
|
||||
|
||||
void assert_sort_result(SNodeList* pList) {
|
||||
SNode* pNode;
|
||||
int32_t i = 0;
|
||||
FOREACH(pNode, pList) {
|
||||
SValueNode* p = (SValueNode*)pNode;
|
||||
ASSERT_EQ(p->datum.i, i++);
|
||||
}
|
||||
SListCell* pCell = pList->pHead;
|
||||
ASSERT_TRUE(pCell->pPrev == NULL);
|
||||
ASSERT_TRUE(pList->pTail->pNext == NULL);
|
||||
int32_t len = 1;
|
||||
while (pCell) {
|
||||
if (pCell->pNext) {
|
||||
ASSERT_TRUE(pCell->pNext->pPrev == pCell);
|
||||
}
|
||||
pCell = pCell->pNext;
|
||||
if (pCell) len++;
|
||||
}
|
||||
ASSERT_EQ(len, pList->length);
|
||||
}
|
||||
|
||||
TEST(NodesTest, sort) {
|
||||
SValueNode *vn1 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
vn1->datum.i = 4;
|
||||
|
||||
SValueNode *vn2 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
vn2->datum.i = 3;
|
||||
|
||||
SValueNode *vn3 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
vn3->datum.i = 2;
|
||||
|
||||
SValueNode *vn4 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
vn4->datum.i = 1;
|
||||
|
||||
SValueNode *vn5 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
vn5->datum.i = 0;
|
||||
|
||||
SNodeList* l = NULL;
|
||||
nodesListMakeAppend(&l, (SNode*)vn1);
|
||||
nodesListMakeAppend(&l, (SNode*)vn2);
|
||||
nodesListMakeAppend(&l, (SNode*)vn3);
|
||||
nodesListMakeAppend(&l, (SNode*)vn4);
|
||||
nodesListMakeAppend(&l, (SNode*)vn5);
|
||||
|
||||
nodesSortList(&l, compareValueNode);
|
||||
|
||||
assert_sort_result(l);
|
||||
|
||||
nodesDestroyList(l);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
|
|
|
@ -110,7 +110,7 @@ typedef struct SParseMetaCache {
|
|||
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
|
||||
SHashObj* pTableCfg; // key is tbFName, element is STableCfg*
|
||||
SHashObj* pViews; // key is viewFName, element is SViewMeta*
|
||||
SHashObj* pTableTSMAs; // key is tbFName, elements are SArray<STableTSMAInfo>*
|
||||
SHashObj* pTableTSMAs; // key is tbFName, elements are SArray<STableTSMAInfo*>
|
||||
SArray* pDnodes; // element is SEpSet
|
||||
bool dnodeRequired;
|
||||
} SParseMetaCache;
|
||||
|
@ -170,6 +170,7 @@ int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, ST
|
|||
int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
|
||||
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
|
||||
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint);
|
||||
int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas);
|
||||
|
||||
/**
|
||||
* @brief return a - b with overflow check
|
||||
|
|
|
@ -707,6 +707,22 @@ static int32_t getDnodeList(STranslateContext* pCxt, SArray** pDnodes) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray** ppTsmas) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = 0;
|
||||
if (pParCxt->async) {
|
||||
code = getTableTsmasFromCache(pCxt->pMetaCache, pName, ppTsmas);
|
||||
} else {
|
||||
SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter,
|
||||
.requestId = pParCxt->requestId,
|
||||
.requestObjRefId = pParCxt->requestRid,
|
||||
.mgmtEps = pParCxt->mgmtEpSet};
|
||||
code = catalogGetTableTsmas(pParCxt->pCatalog, &conn, pName, ppTsmas);
|
||||
}
|
||||
if (code) parserError("0x%" PRIx64 " getDnodeList error, code:%s", pCxt->pParseCxt->requestId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
|
||||
pCxt->pParseCxt = pParseCxt;
|
||||
pCxt->errCode = TSDB_CODE_SUCCESS;
|
||||
|
@ -3612,13 +3628,23 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo
|
|||
if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (isSelectStmt(pCxt->pCurrStmt) && NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pWindow &&
|
||||
if (0 && isSelectStmt(pCxt->pCurrStmt) && NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pWindow &&
|
||||
QUERY_NODE_INTERVAL_WINDOW == nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pWindow)) {
|
||||
return getTableIndex(pCxt, pName, &pRealTable->pSmaIndexes);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
|
||||
if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (isSelectStmt(pCxt->pCurrStmt)) {
|
||||
return getTableTsmas(pCxt, pName, &pRealTable->pTsmas);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if ((!pSelect->hasLastRowFunc && !pSelect->hasLastFunc) || QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) ||
|
||||
TSDB_SYSTEM_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType) {
|
||||
|
@ -4211,6 +4237,9 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setTableIndex(pCxt, &name, pRealTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setTableTsmas(pCxt, &name, pRealTable);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision;
|
||||
|
@ -4934,8 +4963,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindo
|
|||
if (IS_CALENDAR_TIME_DURATION(pSliding->unit)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_UNIT);
|
||||
}
|
||||
if ((pSliding->datum.i <
|
||||
convertTimeFromPrecisionToUnit(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, pSliding->unit)) ||
|
||||
if ((pSliding->datum.i < convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI,
|
||||
precision)) ||
|
||||
(pInter->datum.i / pSliding->datum.i > INTERVAL_SLIDING_FACTOR)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL);
|
||||
}
|
||||
|
@ -10371,6 +10400,18 @@ SNode* createColumnNodeWithName(const char* name) {
|
|||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
static bool sortFuncWithFuncId(SNode* pNode1, SNode* pNode2) {
|
||||
SFunctionNode* pFunc1 = (SFunctionNode*)pNode1;
|
||||
SFunctionNode* pFunc2 = (SFunctionNode*)pNode2;
|
||||
return pFunc1->funcId < pFunc2->funcId;
|
||||
}
|
||||
|
||||
static bool sortColWithColId(SNode* pNode1, SNode* pNode2) {
|
||||
SColumnNode* pCol1 = (SColumnNode*)pNode1;
|
||||
SColumnNode* pCol2 = (SColumnNode*)pNode2;
|
||||
return pCol1->colId < pCol2->colId;
|
||||
}
|
||||
|
||||
static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq,
|
||||
STableMeta* pTableMeta) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -10444,9 +10485,33 @@ translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta*
|
|||
break;
|
||||
}
|
||||
strcpy(pCol->colName, schema->name);
|
||||
pCol->colId = schema->colId;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pStmt->pOptions->pCols) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
|
||||
const SSchema* pSchema = &pTableMeta->schema[i];
|
||||
if (strcmp(pSchema->name, pCol->colName) == 0) {
|
||||
pCol->colId = pSchema->colId;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
nodesSortList(&pStmt->pOptions->pCols, sortColWithColId);
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pStmt->pOptions->pFuncs) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
int32_t funcId = getFuncId(pFunc->functionName);
|
||||
if (funcId < 0) {
|
||||
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
|
||||
}
|
||||
pFunc->funcId = funcId;
|
||||
}
|
||||
nodesSortList(&pStmt->pOptions->pFuncs, sortFuncWithFuncId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// assemble funcs with columns
|
||||
SNode *pNode1, *pNode2;
|
||||
|
|
|
@ -1158,6 +1158,18 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas) {
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTbName, tbFName);
|
||||
STableTSMAInfoRsp *pTsmasRsp = NULL;
|
||||
STableTSMAInfo* pTsma = NULL;
|
||||
int32_t code = getMetaDataFromHash(tbFName, strlen(tbFName), pMetaCache->pTableTSMAs, (void**)&pTsmasRsp);
|
||||
if (TSDB_CODE_SUCCESS == code && pTsmasRsp) {
|
||||
*pTsmas = pTsmasRsp->pTsmas;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableCfg* tableCfgDup(STableCfg* pCfg) {
|
||||
STableCfg* pNew = taosMemoryMalloc(sizeof(*pNew));
|
||||
|
||||
|
|
|
@ -364,6 +364,7 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
|
|||
|
||||
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList);
|
||||
TSWAP(pScan->pSmaIndexes, pRealTable->pSmaIndexes);
|
||||
TSWAP(pScan->pTsmas, pRealTable->pTsmas);
|
||||
pScan->tableId = pRealTable->pMeta->uid;
|
||||
pScan->stableId = pRealTable->pMeta->suid;
|
||||
pScan->tableType = pRealTable->pMeta->tableType;
|
||||
|
|
|
@ -5767,6 +5767,243 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
SNode* pTmpNode;
|
||||
SNodeList* pFuncs = NULL;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
SLogicNode* pParent = pScan->node.pParent;
|
||||
SNode* pConds = pScan->node.pConditions;
|
||||
|
||||
if (pScan->scanType != SCAN_TYPE_TABLE || !pParent || pConds) return false;
|
||||
|
||||
if (!pScan->pTsmas || pScan->pTsmas->size <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (nodeType(pParent)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pParent;
|
||||
// only time window interval supported
|
||||
if (pWindow->winType != WINDOW_TYPE_INTERVAL) return false;
|
||||
pFuncs = pWindow->pFuncs;
|
||||
} break;
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG: {
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pParent;
|
||||
// group/partition by normal cols not supported
|
||||
if (pAgg->pGroupKeys) return false;
|
||||
pFuncs = pAgg->pAggFuncs;
|
||||
} break;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
// TODO may need to replace func conds in having
|
||||
|
||||
assert(pFuncs);
|
||||
FOREACH(pTmpNode, pFuncs) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
|
||||
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
typedef struct STSMAOptUsefulTsma {
|
||||
const STableTSMAInfo* pTsma;
|
||||
STimeWindow timeRange; // scan time range for this tsma
|
||||
} STSMAOptUsefulTsma;
|
||||
|
||||
typedef struct STSMAOptCtx {
|
||||
// input
|
||||
const SLogicNode* pParent; // Agg or Interval
|
||||
const SNodeList* pAggFuncs;
|
||||
const STimeWindow* pTimeRange;
|
||||
const SArray* pTsmas;
|
||||
SInterval* queryInterval;
|
||||
|
||||
// output
|
||||
SArray* usefulTsmas; // SArray<STSMAOptUseFulTsma>, sorted by tsma interval from long to short
|
||||
} STSMAOptCtx;
|
||||
|
||||
static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
|
||||
int32_t code = 0;
|
||||
pTsmaOptCtx->pParent = pScan->node.pParent;
|
||||
pTsmaOptCtx->pTsmas = pScan->pTsmas;
|
||||
pTsmaOptCtx->pTimeRange = &pScan->scanRange;
|
||||
pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval));
|
||||
if (!pTsmaOptCtx->queryInterval) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
if (nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pTsmaOptCtx->pParent;
|
||||
pTsmaOptCtx->queryInterval->interval = pWindow->interval;
|
||||
pTsmaOptCtx->queryInterval->intervalUnit = pWindow->intervalUnit;
|
||||
pTsmaOptCtx->queryInterval->offset = pWindow->offset;
|
||||
pTsmaOptCtx->queryInterval->offsetUnit = pWindow->intervalUnit;
|
||||
pTsmaOptCtx->queryInterval->sliding = pWindow->sliding;
|
||||
pTsmaOptCtx->queryInterval->slidingUnit = pWindow->slidingUnit;
|
||||
pTsmaOptCtx->queryInterval->precision = pWindow->node.precision;
|
||||
pTsmaOptCtx->queryInterval->tz = tsTimezone;
|
||||
pTsmaOptCtx->pAggFuncs = pWindow->pFuncs;
|
||||
} else {
|
||||
ASSERT(nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_AGG);
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent;
|
||||
pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs;
|
||||
}
|
||||
pTsmaOptCtx->usefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma));
|
||||
if (!pTsmaOptCtx->usefulTsmas) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) {
|
||||
taosArrayDestroy(pTsmaOptCtx->usefulTsmas);
|
||||
taosMemoryFreeClear(pTsmaOptCtx->queryInterval);
|
||||
}
|
||||
|
||||
static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) {
|
||||
if (!pTsmaOptCtx->queryInterval) return true;
|
||||
|
||||
// TODO save tsmaInterval in table precision to avoid convertions
|
||||
int32_t code = getDuration(tsmaInterval, tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision);
|
||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
|
||||
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
|
||||
bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0;
|
||||
return validInterval && validSliding && validOffset;
|
||||
}
|
||||
|
||||
static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs) {
|
||||
SNode* pNode;
|
||||
int32_t tsmaColNum = 1;
|
||||
bool failed = false, found = false;
|
||||
int32_t firstFuncId = ((STableTSMAFuncInfo*)taosArrayGet(pTsmaFuncs, 0))->funcId;
|
||||
// find col num
|
||||
for (int32_t i = 1; i < pTsmaFuncs->size; ++i) {
|
||||
STableTSMAFuncInfo* pTsmaFunc = taosArrayGet(pTsmaFuncs, i);
|
||||
if (firstFuncId == pTsmaFunc->funcId) {
|
||||
tsmaColNum++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
FOREACH(pNode, pQueryFuncs) {
|
||||
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
|
||||
if (1 != pQueryFunc->pParameterList->length ||
|
||||
nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId;
|
||||
found = false;
|
||||
// iterate funcs
|
||||
for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) {
|
||||
STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i);
|
||||
if (pQueryFunc->funcId < pTsmaFuncInfo->funcId) {
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
if (pQueryFunc->funcId > pTsmaFuncInfo->funcId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// iterate cols within a func
|
||||
for (int32_t j = i; j < tsmaColNum; ++j) {
|
||||
if (j > i) {
|
||||
pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j);
|
||||
}
|
||||
if (queryColId < pTsmaFuncInfo->colId) {
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
if (queryColId > pTsmaFuncInfo->colId) {
|
||||
continue;
|
||||
}
|
||||
found= true;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (failed || !found) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
static void tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
|
||||
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
|
||||
STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i);
|
||||
// filter with interval
|
||||
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
|
||||
continue;
|
||||
}
|
||||
// filter with funcs, note that tsma funcs has been sorted by funcId and ColId
|
||||
if (!tsmaOptCheckValidFuncs(pTsma->pFuncs, pTsmaOptCtx->pAggFuncs)) {
|
||||
continue;
|
||||
}
|
||||
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma, .timeRange.skey = TSKEY_MIN, .timeRange.ekey = TSKEY_MAX};
|
||||
taosArrayPush(pTsmaOptCtx->usefulTsmas, &usefulTsma);
|
||||
}
|
||||
}
|
||||
|
||||
static int tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
|
||||
const STSMAOptUsefulTsma* p = pLeft, *q = pRight;
|
||||
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
|
||||
int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI);
|
||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||
code = getDuration(qInterval, q->pTsma->unit, &qInterval, TSDB_TIME_PRECISION_MILLI);
|
||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||
if (pInterval > qInterval) return -1;
|
||||
if (pInterval < qInterval) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsmaOptSplitWindows(STSMAOptCtx *pTsmaOptCtx) {
|
||||
// head windows
|
||||
if (pTsmaOptCtx->pTimeRange->skey != TSKEY_MIN) {
|
||||
|
||||
}
|
||||
|
||||
// normal biggest tsma windows
|
||||
|
||||
// tail windows
|
||||
if (pTsmaOptCtx->pTimeRange->ekey != TSKEY_MAX) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static void tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
|
||||
|
||||
}
|
||||
|
||||
static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
int32_t code = 0;
|
||||
STSMAOptCtx tsmaOptCtx = {0};
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tsmaOptMayBeOptimized);
|
||||
if (!pScan) return code;
|
||||
|
||||
fillTSMAOptCtx(&tsmaOptCtx, pScan);
|
||||
// 1. extract useful tsmas
|
||||
tsmaOptFilterTsmas(&tsmaOptCtx);
|
||||
// 2. sort useful tsmas with interval
|
||||
taosArraySort(tsmaOptCtx.usefulTsmas, tsmaInfoCompWithIntervalDesc);
|
||||
// 3. generate and replace logic plans
|
||||
// a. split windows
|
||||
tsmaOptSplitWindows(&tsmaOptCtx);
|
||||
// b. create logic plan
|
||||
tsmaOptGeneratePlan(&tsmaOptCtx);
|
||||
// c. rewrite agg funcs
|
||||
//
|
||||
clearTSMAOptCtx(&tsmaOptCtx);
|
||||
return code;
|
||||
}
|
||||
|
||||
// clang-format off
|
||||
static const SOptimizeRule optimizeRuleSet[] = {
|
||||
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
|
||||
|
@ -5791,6 +6028,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
||||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||
{.pName = "PartitionCols", .optimizeFunc = partitionColsOpt},
|
||||
{.pName = "Tsma", .optimizeFunc = tsmaOptimize},
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
Loading…
Reference in New Issue