tsma optimization

This commit is contained in:
wangjiaming0909 2023-12-22 14:05:31 +08:00
parent 0fd66d7e8a
commit 4b6fb0ffc6
11 changed files with 347 additions and 80 deletions

View File

@ -4211,6 +4211,7 @@ typedef struct {
char tb[TSDB_TABLE_NAME_LEN]; char tb[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid; uint64_t suid;
uint64_t destTbUid;
uint64_t dbId; uint64_t dbId;
int32_t version; int32_t version;
int64_t interval; int64_t interval;
@ -4224,6 +4225,7 @@ typedef struct {
int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp); int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp);
int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pReq); int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pReq);
int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes);
void tFreeTableTSMAInfo(void* p); void tFreeTableTSMAInfo(void* p);
void tFreeAndClearTableTSMAInfo(void* p); void tFreeAndClearTableTSMAInfo(void* p);
void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp* pRsp); void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp* pRsp);

View File

@ -273,9 +273,10 @@ bool fmIsInvertible(int32_t funcId);
char* fmGetFuncName(int32_t funcId); char* fmGetFuncName(int32_t funcId);
bool fmIsTSMASupportedFunc(func_id_t funcid); bool fmIsTSMASupportedFunc(func_id_t funcId);
int32_t rewriteFuncsForTSMA(SNodeList* pFuncs); int32_t fmCreateStateFuncs(SNodeList* pFuncs);
int32_t getFuncId(const char* name); int32_t fmGetFuncId(const char* name);
bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -9970,6 +9970,7 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
if (tEncodeCStr(pEncoder, pTsmaInfo->tb) < 0) return -1; if (tEncodeCStr(pEncoder, pTsmaInfo->tb) < 0) return -1;
if (tEncodeCStr(pEncoder, pTsmaInfo->dbFName) < 0) return -1; if (tEncodeCStr(pEncoder, pTsmaInfo->dbFName) < 0) return -1;
if (tEncodeU64(pEncoder, pTsmaInfo->suid) < 0) return -1; if (tEncodeU64(pEncoder, pTsmaInfo->suid) < 0) return -1;
if (tEncodeU64(pEncoder, pTsmaInfo->destTbUid) < 0) return -1;
if (tEncodeU64(pEncoder, pTsmaInfo->dbId) < 0) return -1; if (tEncodeU64(pEncoder, pTsmaInfo->dbId) < 0) return -1;
if (tEncodeI32(pEncoder, pTsmaInfo->version) < 0) return -1; if (tEncodeI32(pEncoder, pTsmaInfo->version) < 0) return -1;
if (tEncodeCStr(pEncoder, pTsmaInfo->targetTb) < 0) return -1; if (tEncodeCStr(pEncoder, pTsmaInfo->targetTb) < 0) return -1;
@ -9993,6 +9994,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
if (tDecodeCStrTo(pDecoder, pTsmaInfo->tb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTsmaInfo->tb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTsmaInfo->dbFName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTsmaInfo->dbFName) < 0) return -1;
if (tDecodeU64(pDecoder, &pTsmaInfo->suid) < 0) return -1; if (tDecodeU64(pDecoder, &pTsmaInfo->suid) < 0) return -1;
if (tDecodeU64(pDecoder, &pTsmaInfo->destTbUid) < 0) return -1;
if (tDecodeU64(pDecoder, &pTsmaInfo->dbId) < 0) return -1; if (tDecodeU64(pDecoder, &pTsmaInfo->dbId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTsmaInfo->version) < 0) return -1; if (tDecodeI32(pDecoder, &pTsmaInfo->version) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTsmaInfo->targetTb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTsmaInfo->targetTb) < 0) return -1;
@ -10081,6 +10083,22 @@ void tFreeAndClearTableTSMAInfo(void* p) {
} }
} }
int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pInfo) {
return TSDB_CODE_SUCCESS;
}
STableTSMAInfo* pRet = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pRet) return TSDB_CODE_OUT_OF_MEMORY;
*pRet = *pInfo;
if (pInfo->pFuncs) {
pRet->pFuncs = taosArrayDup(pInfo->pFuncs, NULL);
}
*pRes = pRet;
return code;
}
void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp *pRsp) { void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp *pRsp) {
if (pRsp && pRsp->pTsmas) { if (pRsp && pRsp->pTsmas) {
taosArrayDestroyP(pRsp->pTsmas, tFreeAndClearTableTSMAInfo); taosArrayDestroyP(pRsp->pTsmas, tFreeAndClearTableTSMAInfo);

View File

@ -1394,7 +1394,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) {
pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN);
memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
pCxt->pSma->dstTbUid = mndGenerateUid(pCxt->pSma->dstTbName, TSDB_TABLE_FNAME_LEN); pCxt->pSma->dstTbUid = 0; // not used
pCxt->pSma->stbUid = pCxt->pSrcStb->uid; pCxt->pSma->stbUid = pCxt->pSrcStb->uid;
pCxt->pSma->dbUid = pCxt->pDb->uid; pCxt->pSma->dbUid = pCxt->pDb->uid;
pCxt->pSma->interval = pCxt->pCreateSmaReq->interval; pCxt->pSma->interval = pCxt->pCreateSmaReq->interval;
@ -1561,8 +1561,8 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
code = -1; code = -1;
goto _OVER; goto _OVER;
} else { } else {
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid,
pCxt->pCreateSmaReq->name, sma.uid, sma.stbUid, sma.dstTbUid, sma.dstTbName, sma.dstVgId); sma.stbUid, sma.dstTbName, sma.dstVgId);
code = 0; code = 0;
} }
@ -1921,13 +1921,14 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) {
taosMemoryFree(p); taosMemoryFree(p);
} }
int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, STableTSMAInfo* pInfo) { int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo) {
int32_t code = 0; int32_t code = 0;
pInfo->interval = pSma->interval; pInfo->interval = pSma->interval;
pInfo->unit = pSma->intervalUnit; pInfo->unit = pSma->intervalUnit;
pInfo->tsmaId = pSma->uid; pInfo->tsmaId = pSma->uid;
pInfo->version = pSma->version; pInfo->version = pSma->version;
pInfo->tsmaId = pSma->uid; pInfo->tsmaId = pSma->uid;
pInfo->destTbUid = pDestStb->uid;
SName sName = {0}; SName sName = {0};
tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN); tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN);
@ -1987,13 +1988,21 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
continue; continue;
} }
pStb = mndAcquireStb(pMnode, pSma->dstTbName);
if (!pStb) {
sdbRelease(pSdb, pSma);
continue;
}
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pTsma) { if (!pTsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mndReleaseStb(pMnode, pStb);
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
return code; return code;
} }
terrno = dumpTSMAInfoFromSmaObj(pSma, pTsma); terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma);
mndReleaseStb(pMnode, pStb);
if (terrno) { if (terrno) {
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
return code; return code;
@ -2120,16 +2129,25 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
continue; continue;
} }
SStbObj* pDestStb = mndAcquireStb(pMnode, pSma->dstTbName);
if (!pDestStb) {
mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName);
mndReleaseSma(pMnode, pSma);
continue;
}
// dump smaObj into rsp // dump smaObj into rsp
STableTSMAInfo * pInfo = NULL; STableTSMAInfo * pInfo = NULL;
pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pInfo || (terrno = dumpTSMAInfoFromSmaObj(pSma, pInfo))) { if (!pInfo || (terrno = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo))) {
mndReleaseSma(pMnode, pSma); mndReleaseSma(pMnode, pSma);
mndReleaseStb(pMnode, pDestStb);
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
goto _OVER; goto _OVER;
} }
taosArrayPush(hbRsp.pTsmas, pInfo); taosArrayPush(hbRsp.pTsmas, pInfo);
mndReleaseStb(pMnode, pDestStb);
mndReleaseSma(pMnode, pSma); mndReleaseSma(pMnode, pSma);
} }

View File

@ -1120,7 +1120,6 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp); int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp);
int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp); int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp);
int32_t ctgCloneTbTSMA(STSMACache* pTsmas, STSMACache** pRes);
int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation); int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation);
int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation); int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation);
uint64_t ctgGetTbTSMACacheSize(STSMACache* pTsmaInfo); uint64_t ctgGetTbTSMACacheSize(STSMACache* pTsmaInfo);

View File

@ -2657,7 +2657,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) { if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pOut->pTsmas, i); STableTSMAInfo* pInfo = taosArrayGetP(pOut->pTsmas, i);
CTG_ERR_JRET(ctgCloneTbTSMA(pInfo, &pTsma)); CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma));
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pTask->pJob->pCtg, &pTsma, false)); CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pTask->pJob->pCtg, &pTsma, false));
} }
} }

View File

@ -3287,10 +3287,11 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
SMetaRes res = {0}; SMetaRes res = {0};
// TODO if pCache->pTsmas is empty, maybe we should get tsmas from mnode
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
STSMACache *pTsmaOut = NULL; STSMACache *pTsmaOut = NULL;
STSMACache *pTsmaCache = taosArrayGetP(pCache->pTsmas, i); STSMACache *pTsmaCache = taosArrayGetP(pCache->pTsmas, i);
code = ctgCloneTbTSMA(pTsmaCache, &pTsmaOut); code = tCloneTbTSMAInfo(pTsmaCache, &pTsmaOut);
if (code) { if (code) {
ctgReleaseTSMAToCache(pCtg, dbCache, pCache); ctgReleaseTSMAToCache(pCtg, dbCache, pCache);
tFreeTableTSMAInfoRsp(pRsp); tFreeTableTSMAInfoRsp(pRsp);

View File

@ -2382,21 +2382,6 @@ int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) {
} }
uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) { uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) {
//TODO
return 0; return 0;
} }
int32_t ctgCloneTbTSMA(STSMACache* pInfo, STSMACache** pRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pInfo) {
return TSDB_CODE_SUCCESS;
}
STSMACache* pCache = taosMemoryCalloc(1, sizeof(STSMACache));
if (!pCache) return TSDB_CODE_OUT_OF_MEMORY;
*pCache = *pInfo;
if (pInfo->pFuncs) {
pCache->pFuncs = taosArrayDup(pInfo->pFuncs, NULL);
}
*pRes = pCache;
return code;
}

View File

@ -538,11 +538,11 @@ static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool fmIsTSMASupportedFunc(func_id_t funcid) { bool fmIsTSMASupportedFunc(func_id_t funcId) {
return fmIsAggFunc(funcid) && !fmIsForbidStreamFunc(funcid); return fmIsAggFunc(funcId) && !fmIsForbidStreamFunc(funcId);
} }
int32_t rewriteFuncsForTSMA(SNodeList* pFuncs) { int32_t fmCreateStateFuncs(SNodeList* pFuncs) {
int32_t code; int32_t code;
SNode* pNode; SNode* pNode;
char buf[128] = {0}; char buf[128] = {0};
@ -568,7 +568,7 @@ int32_t rewriteFuncsForTSMA(SNodeList* pFuncs) {
return code; return code;
} }
int32_t getFuncId(const char* name) { int32_t fmGetFuncId(const char* name) {
if (NULL != gFunMgtService.pFuncNameHashTable) { if (NULL != gFunMgtService.pFuncNameHashTable) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name)); void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name));
if (NULL != pVal) { if (NULL != pVal) {
@ -583,3 +583,12 @@ int32_t getFuncId(const char* name) {
} }
return -1; return -1;
} }
bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) {
const SBuiltinFuncDefinition* pFunc = &funcMgtBuiltins[funcId];
const SBuiltinFuncDefinition* pStateFunc = &funcMgtBuiltins[stateFuncId];
if (!pFunc->pStateFunc) {
return false;
}
return strcmp(pFunc->pStateFunc, pStateFunc->name) == 0;
}

View File

@ -10448,7 +10448,7 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC
} }
if (code == TSDB_CODE_SUCCESS) if (code == TSDB_CODE_SUCCESS)
code = rewriteFuncsForTSMA(info.pFuncs); code = fmCreateStateFuncs(info.pFuncs);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = buildSampleAst(pCxt, &info, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); code = buildSampleAst(pCxt, &info, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
@ -10505,7 +10505,7 @@ translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta*
SNode* pNode; SNode* pNode;
FOREACH(pNode, pStmt->pOptions->pFuncs) { FOREACH(pNode, pStmt->pOptions->pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode; SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t funcId = getFuncId(pFunc->functionName); int32_t funcId = fmGetFuncId(pFunc->functionName);
if (funcId < 0) { if (funcId < 0) {
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION; return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
} }

View File

@ -5802,6 +5802,7 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
assert(pFuncs); assert(pFuncs);
FOREACH(pTmpNode, pFuncs) { FOREACH(pTmpNode, pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
// TODO test other pseudo column funcs
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId)) { if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId)) {
return false; return false;
} }
@ -5813,31 +5814,38 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
} }
typedef struct STSMAOptUsefulTsma { typedef struct STSMAOptUsefulTsma {
const STableTSMAInfo* pTsma; const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation
STimeWindow timeRange; // scan time range for this tsma STimeWindow scanRange; // scan time range for this tsma
STimeWindow windowRange; // window range used for window filtering
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
} STSMAOptUsefulTsma; } STSMAOptUsefulTsma;
typedef struct STSMAOptCtx { typedef struct STSMAOptCtx {
// input // input
const SLogicNode* pParent; // Agg or Interval const SScanLogicNode* pScan;
const SLogicNode* pParent; // parent of Table Scan, Agg or Interval
const SNodeList* pAggFuncs; const SNodeList* pAggFuncs;
const STimeWindow* pTimeRange; const STimeWindow* pTimeRange;
const SArray* pTsmas; const SArray* pTsmas;
SInterval* queryInterval; SInterval* queryInterval; // not null with window logic node
// output // output
SArray* usefulTsmas; // SArray<STSMAOptUseFulTsma>, sorted by tsma interval from long to short SArray* pUsefulTsmas; // SArray<STSMAOptUseFulTsma>, sorted by tsma interval from long to short
SArray* pUsedTsmas;
SLogicSubplan* generatedSubPlans[2];
} STSMAOptCtx; } STSMAOptCtx;
static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) { static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
int32_t code = 0; int32_t code = 0;
pTsmaOptCtx->pScan = pScan;
pTsmaOptCtx->pParent = pScan->node.pParent; pTsmaOptCtx->pParent = pScan->node.pParent;
pTsmaOptCtx->pTsmas = pScan->pTsmas; pTsmaOptCtx->pTsmas = pScan->pTsmas;
pTsmaOptCtx->pTimeRange = &pScan->scanRange; pTsmaOptCtx->pTimeRange = &pScan->scanRange;
if (nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval)); pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval));
if (!pTsmaOptCtx->queryInterval) return TSDB_CODE_OUT_OF_MEMORY; if (!pTsmaOptCtx->queryInterval) return TSDB_CODE_OUT_OF_MEMORY;
if (nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pTsmaOptCtx->pParent; SWindowLogicNode* pWindow = (SWindowLogicNode*)pTsmaOptCtx->pParent;
pTsmaOptCtx->queryInterval->interval = pWindow->interval; pTsmaOptCtx->queryInterval->interval = pWindow->interval;
pTsmaOptCtx->queryInterval->intervalUnit = pWindow->intervalUnit; pTsmaOptCtx->queryInterval->intervalUnit = pWindow->intervalUnit;
@ -5853,15 +5861,22 @@ static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan)
SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent; SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent;
pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs; pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs;
} }
pTsmaOptCtx->usefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma)); pTsmaOptCtx->pUsefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma));
if (!pTsmaOptCtx->usefulTsmas) { pTsmaOptCtx->pUsedTsmas = taosArrayInit(3, sizeof(STSMAOptUsefulTsma));
if (!pTsmaOptCtx->pUsefulTsmas || !pTsmaOptCtx->pUsedTsmas) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
return code; return code;
} }
static void tsmaOptFreeUsefulTsma(void* p) {
STSMAOptUsefulTsma* pTsma = p;
taosArrayDestroy(pTsma->pTsmaScanCols);
}
static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) { static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) {
taosArrayDestroy(pTsmaOptCtx->usefulTsmas); taosArrayDestroyEx(pTsmaOptCtx->pUsefulTsmas, tsmaOptFreeUsefulTsma);
taosArrayDestroy(pTsmaOptCtx->pUsedTsmas);
taosMemoryFreeClear(pTsmaOptCtx->queryInterval); taosMemoryFreeClear(pTsmaOptCtx->queryInterval);
} }
@ -5869,7 +5884,10 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn
if (!pTsmaOptCtx->queryInterval) return true; if (!pTsmaOptCtx->queryInterval) return true;
// TODO save tsmaInterval in table precision to avoid convertions // TODO save tsmaInterval in table precision to avoid convertions
int32_t code = getDuration(tsmaInterval, tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision); // TODO save the right unit
int32_t code =
getDuration(convertTimeFromPrecisionToUnit(tsmaInterval, pTsmaOptCtx->queryInterval->precision, tsmaIntevalUnit),
tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision);
ASSERT(code == TSDB_CODE_SUCCESS); ASSERT(code == TSDB_CODE_SUCCESS);
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0; bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0; bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
@ -5877,7 +5895,7 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn
return validInterval && validSliding && validOffset; return validInterval && validSliding && validOffset;
} }
static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs) { static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs, SArray* pTsmaScanCols) {
SNode* pNode; SNode* pNode;
int32_t tsmaColNum = 1; int32_t tsmaColNum = 1;
bool failed = false, found = false; bool failed = false, found = false;
@ -5892,8 +5910,11 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
} }
} }
taosArrayClear(pTsmaScanCols);
FOREACH(pNode, pQueryFuncs) { FOREACH(pNode, pQueryFuncs) {
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode; SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
// TODO handle _wstart
if (fmIsPseudoColumnFunc(pQueryFunc->funcId)) continue;
if (1 != pQueryFunc->pParameterList->length || if (1 != pQueryFunc->pParameterList->length ||
nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) { nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
failed = true; failed = true;
@ -5902,18 +5923,15 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId; int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId;
found = false; found = false;
// iterate funcs // iterate funcs
// TODO if func is count, skip checking cols
for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) { for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) {
STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i); STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i);
if (pQueryFunc->funcId < pTsmaFuncInfo->funcId) { if (!fmIsMyStateFunc(pQueryFunc->funcId, pTsmaFuncInfo->funcId)) {
failed = true;
break;
}
if (pQueryFunc->funcId > pTsmaFuncInfo->funcId) {
continue; continue;
} }
// iterate cols within a func // iterate cols within a func
for (int32_t j = i; j < tsmaColNum; ++j) { for (int32_t j = i; j < tsmaColNum + i; ++j) {
if (j > i) { if (j > i) {
pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j); pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j);
} }
@ -5925,6 +5943,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
continue; continue;
} }
found= true; found= true;
taosArrayPush(pTsmaScanCols, &j);
break; break;
} }
break; break;
@ -5936,23 +5955,39 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
return found; return found;
} }
static void tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
STSMAOptUsefulTsma usefulTsma = {.pTsma = NULL,
.scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
.windowRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}};
SArray* pTsmaScanCols = NULL;
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) { for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
if (!pTsmaScanCols) {
pTsmaScanCols = taosArrayInit(pTsmaOptCtx->pAggFuncs->length, sizeof(int32_t));
if (!pTsmaScanCols) return TSDB_CODE_OUT_OF_MEMORY;
}
STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i);
// filter with interval // filter with interval
// TODO unit not right
if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) {
continue; continue;
} }
// filter with funcs, note that tsma funcs has been sorted by funcId and ColId // filter with funcs, note that tsma funcs has been sorted by funcId and ColId
if (!tsmaOptCheckValidFuncs(pTsma->pFuncs, pTsmaOptCtx->pAggFuncs)) { if (!tsmaOptCheckValidFuncs(pTsma->pFuncs, pTsmaOptCtx->pAggFuncs, pTsmaScanCols)) {
continue; continue;
} }
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma, .timeRange.skey = TSKEY_MIN, .timeRange.ekey = TSKEY_MAX}; usefulTsma.pTsma = pTsma;
taosArrayPush(pTsmaOptCtx->usefulTsmas, &usefulTsma); usefulTsma.pTsmaScanCols = pTsmaScanCols;
pTsmaScanCols = NULL;
taosArrayPush(pTsmaOptCtx->pUsefulTsmas, &usefulTsma);
} }
if (pTsmaScanCols) taosArrayDestroy(pTsmaScanCols);
// TODO filter smaller tsmas that not aligned with the biggest tsma
return TSDB_CODE_SUCCESS;
} }
static int tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) { static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
const STSMAOptUsefulTsma* p = pLeft, *q = pRight; const STSMAOptUsefulTsma* p = pLeft, *q = pRight;
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval; int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI); int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI);
@ -5964,22 +5999,216 @@ static int tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
return 0; return 0;
} }
static void tsmaOptSplitWindows(STSMAOptCtx *pTsmaOptCtx) { static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx, int64_t alignInterval, int8_t precision) {
// head windows int64_t tsmaInterval;
if (pTsmaOptCtx->pTimeRange->skey != TSKEY_MIN) { for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) {
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i);
getDuration(pUsefulTsma->pTsma->interval, pUsefulTsma->pTsma->unit, &tsmaInterval, precision);
if (alignInterval % tsmaInterval == 0) {
return pUsefulTsma;
}
}
return NULL;
}
static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAInfo* pTsma, int8_t precision) {
pInterval->interval = pTsma->interval;
pInterval->intervalUnit = pTsma->unit;
pInterval->sliding = pTsma->interval;
pInterval->slidingUnit = pTsma->unit;
pInterval->offset = 0;
pInterval->offsetUnit = pTsma->unit;
pInterval->precision = precision;
}
// TODO refactor, remove some params
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange,
const STimeWindow* pWindowRange, uint32_t tsmaStartIdx) {
bool needTailWindow = false;
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
int64_t winSkey = TSKEY_MIN, winEkey = TSKEY_MAX;
int64_t startOfSkeyFirstWin = pScanRange->skey, endOfSkeyFirstWin;
int64_t startOfEkeyFirstWin = pScanRange->ekey, endOfEkeyFirstWin;
int64_t tsmaInterval;
SInterval interval;
STimeWindow scanRange = *pScanRange;
STimeWindow windowRange = *pWindowRange;
const SInterval* pInterval = pTsmaOptCtx->queryInterval;
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx);
const STableTSMAInfo* pTsma = pUsefulTsma->pTsma;
if (!pInterval) {
tsmaOptInitIntervalFromTsma(&interval, pTsma, 0);
pInterval = &interval;
} }
// normal biggest tsma windows getDuration(pTsma->interval, pTsma->unit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision);
// tail windows // check for head windows
if (pTsmaOptCtx->pTimeRange->ekey != TSKEY_MAX) { if (pScanRange->skey != TSKEY_MIN) {
startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval);
endOfSkeyFirstWin =
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
isSkeyAlignedWithTsma = ((pScanRange->skey - startOfSkeyFirstWin) % tsmaInterval == 0);
} else {
endOfSkeyFirstWin = TSKEY_MIN;
}
// check for tail windows
if (pScanRange->ekey != TSKEY_MAX) {
startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval);
endOfEkeyFirstWin = taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
needTailWindow = true;
// TODO add some notes
isEkeyAlignedWithTsma = ((pScanRange->ekey + 1 - startOfEkeyFirstWin) % tsmaInterval == 0);
}
}
// add head tsma if possible
if (!isSkeyAlignedWithTsma) {
windowRange.ekey =
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 2, pInterval->intervalUnit, pInterval->precision);
scanRange.ekey = TMIN(scanRange.ekey, windowRange.ekey);
const STSMAOptUsefulTsma* pTsmaFound = tsmaOptFindUsefulTsma(
pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->skey - startOfSkeyFirstWin, pInterval->precision);
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
.scanRange = scanRange,
.windowRange = windowRange,
.pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL};
taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma);
}
// the main tsma
if (endOfSkeyFirstWin < startOfEkeyFirstWin) {
scanRange.ekey = TMIN(pScanRange->ekey, endOfEkeyFirstWin);
if (!isSkeyAlignedWithTsma) {
scanRange.skey = endOfSkeyFirstWin;
windowRange.skey = scanRange.skey;
}
windowRange.ekey = pWindowRange->ekey;
if (!isEkeyAlignedWithTsma) {
windowRange.ekey = endOfEkeyFirstWin;
}
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma,
.scanRange = scanRange,
.windowRange = windowRange,
.pTsmaScanCols = pUsefulTsma->pTsmaScanCols};
taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma);
}
// add tail tsma if possible
if (!isEkeyAlignedWithTsma && needTailWindow) {
scanRange.skey = startOfEkeyFirstWin;
scanRange.ekey = pScanRange->ekey;
windowRange.skey = startOfEkeyFirstWin;
windowRange.ekey = pWindowRange->ekey;
const STSMAOptUsefulTsma* pTsmaFound = tsmaOptFindUsefulTsma(
pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->ekey + 1 - startOfEkeyFirstWin, pInterval->precision);
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
.scanRange = scanRange,
.windowRange = windowRange,
.pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL};
taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma);
}
}
static void tsmaOptRewriteAggFuncs(STSMAOptCtx* pTsmaOptCtx) {
}
SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNodeList* pAggFuncs) {
ASSERT(pTsma->pTsma);
ASSERT(pTsma->pTsmaScanCols);
int32_t code;
SNode* pNode;
SNodeList* pScanCols = NULL;
int32_t i = 0;
FOREACH(pNode, pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (fmIsPseudoColumnFunc(pFunc->funcId)) {
continue;
}
const int32_t* idx = taosArrayGet(pTsma->pTsmaScanCols, i);
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (pCol) {
// TODO why 2?
pCol->colId = *idx + 2;
pCol->tableType = TSDB_SUPER_TABLE;
pCol->tableId = pTsma->pTsma->destTbUid;
pCol->colType = COLUMN_TYPE_COLUMN;
strcpy(pCol->tableName, pTsma->pTsma->targetTb);
strcpy(pCol->dbName, pTsma->pTsma->targetDbFName);
strcpy(pCol->colName, pFunc->node.aliasName);
strcpy(pCol->node.aliasName, pFunc->node.aliasName);
pCol->node.resType.type = TSDB_DATA_TYPE_BINARY;
code = nodesListMakeStrictAppend(&pScanCols, (SNode*)pCol);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code) break;
++i;
}
if (code) {
nodesDestroyList(pScanCols);
pScanCols = NULL;
}
return pScanCols;
}
static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma) {
SNode* pNode;
int32_t code = 0;
SScanLogicNode* pScan = (SScanLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pScan);
if (!pScan) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pScan->pScanCols);
pScan->scanRange.skey = pTsma->scanRange.skey;
pScan->scanRange.ekey = pTsma->scanRange.ekey;
if (pTsma->pTsma) {
ASSERT(pTsma->pTsmaScanCols);
pScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
if (!pScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
pScan->tableId = pTsma->pTsma->destTbUid;
pScan->tableType = TSDB_SUPER_TABLE;
strcpy(pScan->tableName.tname, pTsma->pTsma->targetTb); //TODO set dbName
}
}
}
if (code) {
nodesDestroyNode((SNode*)pScan);
}
return code;
}
static void tsmaOptRewriteWindow(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0;
SWindowLogicNode* pWindow = (SWindowLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pParent);
if (!pWindow) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
} }
} }
static void tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { static void tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) {
STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
tsmaOptRewriteScan(pTsmaOptCtx, pTsma);
ENodeType parentType = nodeType(pTsmaOptCtx->pParent);
if (QUERY_NODE_LOGIC_PLAN_WINDOW == parentType) {
tsmaOptRewriteWindow(pTsmaOptCtx, pTsma);
} else if (parentType == QUERY_NODE_LOGIC_PLAN_AGG) {
} else {
ASSERT(0);
}
//tsmaOptRewriteAggFuncs(pTsmaOptCtx, pTsma);
}
} }
static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
@ -5988,18 +6217,23 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tsmaOptMayBeOptimized); SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tsmaOptMayBeOptimized);
if (!pScan) return code; if (!pScan) return code;
fillTSMAOptCtx(&tsmaOptCtx, pScan); code = fillTSMAOptCtx(&tsmaOptCtx, pScan);
if (code == TSDB_CODE_SUCCESS) {
// 1. extract useful tsmas // 1. extract useful tsmas
tsmaOptFilterTsmas(&tsmaOptCtx); code = tsmaOptFilterTsmas(&tsmaOptCtx);
if (code == TSDB_CODE_SUCCESS && tsmaOptCtx.pUsefulTsmas->size > 0) {
// 2. sort useful tsmas with interval // 2. sort useful tsmas with interval
taosArraySort(tsmaOptCtx.usefulTsmas, tsmaInfoCompWithIntervalDesc); taosArraySort(tsmaOptCtx.pUsefulTsmas, tsmaInfoCompWithIntervalDesc);
// 3. generate and replace logic plans // 3. generate and replace logic plans
// a. split windows // a. split windows
tsmaOptSplitWindows(&tsmaOptCtx); tsmaOptSplitWindows(&tsmaOptCtx, tsmaOptCtx.pTimeRange, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, 0);
// b. create logic plan // b. create logic plan
tsmaOptGeneratePlan(&tsmaOptCtx); tsmaOptGeneratePlan(&tsmaOptCtx);
// c. rewrite agg funcs // c. rewrite agg funcs
// tsmaOptRewriteAggFuncs(&tsmaOptCtx);
}
}
clearTSMAOptCtx(&tsmaOptCtx); clearTSMAOptCtx(&tsmaOptCtx);
return code; return code;
} }