generate tsma plan

This commit is contained in:
wangjiaming0909 2023-12-27 14:38:02 +08:00
parent 4b6fb0ffc6
commit 895a4584e6
11 changed files with 359 additions and 109 deletions

View File

@ -4217,6 +4217,7 @@ typedef struct {
int64_t interval;
int8_t unit;
SArray* pFuncs; // SArray<STableTSMAFuncInfo>
SArray* pTags; // SArray<SSchema>
} STableTSMAInfo;
typedef struct {

View File

@ -275,6 +275,7 @@ char* fmGetFuncName(int32_t funcId);
bool fmIsTSMASupportedFunc(func_id_t funcId);
int32_t fmCreateStateFuncs(SNodeList* pFuncs);
int32_t fmCreateStateMergeFuncs(SNodeList* pFuncs);
int32_t fmGetFuncId(const char* name);
bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId);

View File

@ -253,7 +253,9 @@ typedef struct SMergeLogicNode {
SNodeList* pMergeKeys;
SNodeList* pInputs;
int32_t numOfChannels;
int32_t numOfSubplans;
int32_t srcGroupId;
int32_t srcEndGroupId;
bool colsMerge;
bool needSort;
bool groupSort;
@ -607,7 +609,9 @@ typedef struct SMergePhysiNode {
SNodeList* pMergeKeys;
SNodeList* pTargets;
int32_t numOfChannels;
int32_t numOfSubplans;
int32_t srcGroupId;
int32_t srcEndGroupId;
bool groupSort;
bool ignoreGroupId;
bool inputWithGroupId;

View File

@ -9985,6 +9985,13 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
if (tEncodeI32(pEncoder, pFuncInfo->funcId) < 0) return -1;
if (tEncodeI16(pEncoder, pFuncInfo->colId) < 0) return -1;
}
size = pTsmaInfo->pTags ? pTsmaInfo->pTags->size : 0;
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
const SSchema* pSchema = taosArrayGet(pTsmaInfo->pTags, i);
if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1;
}
return 0;
}
@ -10003,7 +10010,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
if (tDecodeI8(pDecoder, &pTsmaInfo->unit) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
if (size <= 0) return 0;
if (size > 0) {
pTsmaInfo->pFuncs = taosArrayInit(size, sizeof(STableTSMAFuncInfo));
if (!pTsmaInfo->pFuncs) return -1;
for (int32_t i = 0; i < size; ++i) {
@ -10013,6 +10020,18 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
if (tDecodeI16(pDecoder, &funcInfo.colId) < 0) return -1;
if (!taosArrayPush(pTsmaInfo->pFuncs, &funcInfo)) return -1;
}
}
if (tDecodeI32(pDecoder, &size) < 0) return -1;
if (size > 0) {
pTsmaInfo->pTags = taosArrayInit(size, sizeof(SSchema));
if (!pTsmaInfo->pTags) return -1;
for (int32_t i = 0; i < size; ++i) {
SSchema schema = {0};
if(tDecodeSSchema(pDecoder, &schema) < 0) return -1;
taosArrayPush(pTsmaInfo->pTags, &schema);
}
}
return 0;
}
@ -10072,7 +10091,10 @@ int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRs
void tFreeTableTSMAInfo(void* p) {
STableTSMAInfo *pTsmaInfo = p;
if (pTsmaInfo) taosArrayDestroy(pTsmaInfo->pFuncs);
if (pTsmaInfo) {
taosArrayDestroy(pTsmaInfo->pFuncs);
taosArrayDestroy(pTsmaInfo->pTags);
}
}
void tFreeAndClearTableTSMAInfo(void* p) {
@ -10095,6 +10117,9 @@ int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) {
if (pInfo->pFuncs) {
pRet->pFuncs = taosArrayDup(pInfo->pFuncs, NULL);
}
if (pInfo->pTags) {
pRet->pTags = taosArrayDup(pInfo->pTags, NULL);
}
*pRes = pRet;
return code;
}

View File

@ -1964,6 +1964,14 @@ int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STa
}
nodesDestroyNode(pNode);
}
if (code == TSDB_CODE_SUCCESS) {
if (pDestStb->numOfTags > 0) {
pInfo->pTags = taosArrayInit(pDestStb->numOfTags, sizeof(SSchema));
for (int32_t i = 0; i < pDestStb->numOfTags; ++i) {
taosArrayPush(pInfo->pTags, &pDestStb->pTags[i]);
}
}
}
return code;
}

View File

@ -2583,6 +2583,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.invertFunc = avgInvertFunction,
#endif
.combineFunc = avgCombine,
.pPartialFunc = "_avg_state_merge",
.pMergeFunc = "_avg_merge",
},
{
.name = "percentile",

View File

@ -528,7 +528,7 @@ static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pSt
SNodeList* pParams = nodesCloneList(pFunc->pParameterList);
if (!pParams) return TSDB_CODE_OUT_OF_MEMORY;
*pStateFunc = createFunction(funcMgtBuiltins[pFunc->funcId].pStateFunc, pParams);
if (!pStateFunc) {
if (!*pStateFunc) {
nodesDestroyList(pParams);
return TSDB_CODE_FUNC_FUNTION_ERROR;
}
@ -568,6 +568,45 @@ int32_t fmCreateStateFuncs(SNodeList* pFuncs) {
return code;
}
static int32_t fmCreateStateMergeFunc(SFunctionNode* pFunc, SFunctionNode** pStateMergeFunc) {
if (funcMgtBuiltins[pFunc->funcId].pMergeFunc) {
SNodeList* pParams = nodesCloneList(pFunc->pParameterList);
if (!pParams) return TSDB_CODE_OUT_OF_MEMORY;
*pStateMergeFunc = createFunction(funcMgtBuiltins[pFunc->funcId].pMergeFunc, pParams);
if (!*pStateMergeFunc) {
nodesDestroyList(pParams);
return TSDB_CODE_FUNC_FUNTION_ERROR;
}
strcpy((*pStateMergeFunc)->node.aliasName, pFunc->node.aliasName);
strcpy((*pStateMergeFunc)->node.userAlias, pFunc->node.userAlias);
}
return TSDB_CODE_SUCCESS;
}
int32_t fmCreateStateMergeFuncs(SNodeList* pFuncs) {
int32_t code;
SNode* pNode;
char buf[128] = {0};
FOREACH(pNode, pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (fmIsTSMASupportedFunc(pFunc->funcId)) {
SFunctionNode* pNewFunc = NULL;
code = fmCreateStateMergeFunc(pFunc, &pNewFunc);
if (code) {
// error
break;
} else if (!pNewFunc) {
// no state merge func
continue;
} else {
REPLACE_NODE(pNewFunc);
nodesDestroyNode(pNode);
}
}
}
return code;
}
int32_t fmGetFuncId(const char* name) {
if (NULL != gFunMgtService.pFuncNameHashTable) {
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name));

View File

@ -572,6 +572,8 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
CLONE_NODE_LIST_FIELD(pInputs);
COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(srcEndGroupId);
COPY_SCALAR_FIELD(numOfSubplans);
COPY_SCALAR_FIELD(colsMerge);
COPY_SCALAR_FIELD(needSort);
COPY_SCALAR_FIELD(groupSort);

View File

@ -5803,7 +5803,8 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
FOREACH(pTmpNode, pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
// TODO test other pseudo column funcs
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId)) {
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) &&
!fmIsGroupKeyFunc(pFunc->funcId)) {
return false;
}
}
@ -5816,18 +5817,18 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
typedef struct STSMAOptUsefulTsma {
const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation
STimeWindow scanRange; // scan time range for this tsma
STimeWindow windowRange; // window range used for window filtering
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
} STSMAOptUsefulTsma;
typedef struct STSMAOptCtx {
// input
const SScanLogicNode* pScan;
const SLogicNode* pParent; // parent of Table Scan, Agg or Interval
SScanLogicNode* pScan;
SLogicNode* pParent; // parent of Table Scan, Agg or Interval
const SNodeList* pAggFuncs;
const STimeWindow* pTimeRange;
const SArray* pTsmas;
SInterval* queryInterval; // not null with window logic node
int8_t precision;
// output
SArray* pUsefulTsmas; // SArray<STSMAOptUseFulTsma>, sorted by tsma interval from long to short
@ -5841,6 +5842,7 @@ static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan)
pTsmaOptCtx->pParent = pScan->node.pParent;
pTsmaOptCtx->pTsmas = pScan->pTsmas;
pTsmaOptCtx->pTimeRange = &pScan->scanRange;
pTsmaOptCtx->precision = pScan->node.precision;
if (nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval));
@ -5885,10 +5887,6 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn
// TODO save tsmaInterval in table precision to avoid convertions
// TODO save the right unit
int32_t code =
getDuration(convertTimeFromPrecisionToUnit(tsmaInterval, pTsmaOptCtx->queryInterval->precision, tsmaIntevalUnit),
tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision);
ASSERT(code == TSDB_CODE_SUCCESS);
bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0;
bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0;
bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0;
@ -5914,7 +5912,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
FOREACH(pNode, pQueryFuncs) {
SFunctionNode* pQueryFunc = (SFunctionNode*)pNode;
// TODO handle _wstart
if (fmIsPseudoColumnFunc(pQueryFunc->funcId)) continue;
if (fmIsPseudoColumnFunc(pQueryFunc->funcId) || fmIsGroupKeyFunc(pQueryFunc->funcId)) continue;
if (1 != pQueryFunc->pParameterList->length ||
nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) {
failed = true;
@ -5956,9 +5954,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
}
static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
STSMAOptUsefulTsma usefulTsma = {.pTsma = NULL,
.scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
.windowRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}};
STSMAOptUsefulTsma usefulTsma = {.pTsma = NULL, .scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}};
SArray* pTsmaScanCols = NULL;
for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) {
@ -6022,8 +6018,7 @@ static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAIn
}
// TODO refactor, remove some params
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange,
const STimeWindow* pWindowRange, uint32_t tsmaStartIdx) {
static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange, uint32_t tsmaStartIdx) {
bool needTailWindow = false;
bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true;
int64_t winSkey = TSKEY_MIN, winEkey = TSKEY_MAX;
@ -6032,23 +6027,22 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
int64_t tsmaInterval;
SInterval interval;
STimeWindow scanRange = *pScanRange;
STimeWindow windowRange = *pWindowRange;
const SInterval* pInterval = pTsmaOptCtx->queryInterval;
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx);
const STableTSMAInfo* pTsma = pUsefulTsma->pTsma;
if (!pInterval) {
tsmaOptInitIntervalFromTsma(&interval, pTsma, 0);
tsmaOptInitIntervalFromTsma(&interval, pTsma, pTsmaOptCtx->precision);
pInterval = &interval;
}
getDuration(pTsma->interval, pTsma->unit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision);
tsmaInterval = pTsma->interval;
// check for head windows
if (pScanRange->skey != TSKEY_MIN) {
startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval);
endOfSkeyFirstWin =
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
isSkeyAlignedWithTsma = ((pScanRange->skey - startOfSkeyFirstWin) % tsmaInterval == 0);
} else {
endOfSkeyFirstWin = TSKEY_MIN;
@ -6057,7 +6051,8 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
// check for tail windows
if (pScanRange->ekey != TSKEY_MAX) {
startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval);
endOfEkeyFirstWin = taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
endOfEkeyFirstWin =
taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pTsmaOptCtx->precision);
if (startOfEkeyFirstWin > startOfSkeyFirstWin) {
needTailWindow = true;
// TODO add some notes
@ -6067,33 +6062,24 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
// add head tsma if possible
if (!isSkeyAlignedWithTsma) {
windowRange.ekey =
taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 2, pInterval->intervalUnit, pInterval->precision);
scanRange.ekey = TMIN(scanRange.ekey, windowRange.ekey);
scanRange.ekey = TMIN(scanRange.ekey, taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 1,
pInterval->intervalUnit, pTsmaOptCtx->precision));
const STSMAOptUsefulTsma* pTsmaFound = tsmaOptFindUsefulTsma(
pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->skey - startOfSkeyFirstWin, pInterval->precision);
pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->skey - startOfSkeyFirstWin, pTsmaOptCtx->precision);
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
.scanRange = scanRange,
.windowRange = windowRange,
.pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL};
taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma);
}
// the main tsma
if (endOfSkeyFirstWin < startOfEkeyFirstWin) {
scanRange.ekey = TMIN(pScanRange->ekey, endOfEkeyFirstWin);
scanRange.ekey = TMIN(pScanRange->ekey, startOfEkeyFirstWin - 1);
if (!isSkeyAlignedWithTsma) {
scanRange.skey = endOfSkeyFirstWin;
windowRange.skey = scanRange.skey;
}
windowRange.ekey = pWindowRange->ekey;
if (!isEkeyAlignedWithTsma) {
windowRange.ekey = endOfEkeyFirstWin;
}
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma,
.scanRange = scanRange,
.windowRange = windowRange,
.pTsmaScanCols = pUsefulTsma->pTsmaScanCols};
STSMAOptUsefulTsma usefulTsma = {
.pTsma = pTsma, .scanRange = scanRange, .pTsmaScanCols = pUsefulTsma->pTsmaScanCols};
taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma);
}
@ -6101,21 +6087,16 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
if (!isEkeyAlignedWithTsma && needTailWindow) {
scanRange.skey = startOfEkeyFirstWin;
scanRange.ekey = pScanRange->ekey;
windowRange.skey = startOfEkeyFirstWin;
windowRange.ekey = pWindowRange->ekey;
const STSMAOptUsefulTsma* pTsmaFound = tsmaOptFindUsefulTsma(
pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->ekey + 1 - startOfEkeyFirstWin, pInterval->precision);
const STSMAOptUsefulTsma* pTsmaFound =
tsmaOptFindUsefulTsma(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->ekey + 1 - startOfEkeyFirstWin,
pTsmaOptCtx->precision);
STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL,
.scanRange = scanRange,
.windowRange = windowRange,
.pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL};
taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma);
}
}
static void tsmaOptRewriteAggFuncs(STSMAOptCtx* pTsmaOptCtx) {
}
SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNodeList* pAggFuncs) {
ASSERT(pTsma->pTsma);
ASSERT(pTsma->pTsmaScanCols);
@ -6127,7 +6108,7 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
FOREACH(pNode, pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (fmIsPseudoColumnFunc(pFunc->funcId)) {
if (fmIsPseudoColumnFunc(pFunc->funcId) || fmIsGroupKeyFunc(pFunc->funcId)) {
continue;
}
const int32_t* idx = taosArrayGet(pTsma->pTsmaScanCols, i);
@ -6158,57 +6139,216 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
return pScanCols;
}
static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma) {
static int32_t tsmaOptRewriteTags(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma, SColumnNode* pTagCol) {
bool found = false;
for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) {
const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, i);
if (strcmp(pTagCol->colName, pSchema->name) == 0) {
strcpy(pTagCol->tableName, pTsma->pTsma->targetTb);
strcpy(pTagCol->tableAlias, pTsma->pTsma->targetTb);
pTagCol->tableId = pTsma->pTsma->destTbUid;
pTagCol->tableType = TSDB_SUPER_TABLE;
pTagCol->colId = pSchema->colId;
found = true;
break;
}
}
ASSERT(found);
return 0;
}
static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNewScan, const STSMAOptUsefulTsma* pTsma) {
SNode* pNode;
int32_t code = 0;
SScanLogicNode* pScan = (SScanLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pScan);
if (!pScan) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pScan->pScanCols);
pScan->scanRange.skey = pTsma->scanRange.skey;
pScan->scanRange.ekey = pTsma->scanRange.ekey;
pNewScan->scanRange.skey = pTsma->scanRange.skey;
pNewScan->scanRange.ekey = pTsma->scanRange.ekey;
if (pTsma->pTsma) {
// PK col
SColumnNode* pPkTsCol = NULL;
FOREACH(pNode, pNewScan->pScanCols) {
SColumnNode* pCol = (SColumnNode*)pNode;
ASSERT(pTsma->pTsmaScanCols);
pScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
if (!pScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pPkTsCol = (SColumnNode*)nodesCloneNode((SNode*)pCol);
if (!pPkTsCol) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
break;
}
}
if (code == TSDB_CODE_SUCCESS) {
pScan->tableId = pTsma->pTsma->destTbUid;
pScan->tableType = TSDB_SUPER_TABLE;
strcpy(pScan->tableName.tname, pTsma->pTsma->targetTb); //TODO set dbName
nodesDestroyList(pNewScan->pScanCols);
// normal cols
// TODO last(ts), maybe i should put pk col after normal cols, if no pk col, then add it
pNewScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs);
if (!pNewScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS && pPkTsCol) {
tstrncpy(pPkTsCol->tableName, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN);
tstrncpy(pPkTsCol->tableAlias, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN);
pPkTsCol->tableId = pTsma->pTsma->destTbUid;
pPkTsCol->tableType = TSDB_SUPER_TABLE;
nodesListMakeAppend(&pNewScan->pScanCols, (SNode*)pPkTsCol);
}
if (code == TSDB_CODE_SUCCESS) {
pNewScan->stableId = pTsma->pTsma->destTbUid;
pNewScan->tableId = pTsma->pTsma->destTbUid;
pNewScan->tableType = TSDB_SUPER_TABLE;
strcpy(pNewScan->tableName.tname, pTsma->pTsma->targetTb); // TODO set dbName
}
if (code == TSDB_CODE_SUCCESS) {
// pseudo columns
FOREACH(pNode, pNewScan->pScanPseudoCols) {
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
tsmaOptRewriteTags(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
}
}
FOREACH(pNode, pNewScan->pGroupTags) {
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
tsmaOptRewriteTags(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
}
}
}
if (code) {
nodesDestroyNode((SNode*)pScan);
}
return code;
}
static void tsmaOptRewriteWindow(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma) {
static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut) {
SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pWStart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pWStart->functionName, "_wstart");
int64_t pointer = (int64_t)pWStart;
char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64 "", pWStart->functionName, pointer);
taosCreateMD5Hash(name, len);
strncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
pWStart->node.resType.precision = precision;
int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
if (code) {
nodesDestroyNode((SNode*)pWStart);
} else {
*pWStartOut = pWStart;
}
return code;
}
static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan) {
SNode * pStateFuncNode, *pAggFuncNode;
SColumnNode* pColNode;
int32_t code = 0;
SWindowLogicNode* pWindow = (SWindowLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pParent);
if (!pWindow) code = TSDB_CODE_OUT_OF_MEMORY;
SNodeList* pAggStateFuncs = NULL;
SNodeList* pAggFuncs = NULL;
SWindowLogicNode* pWindow = NULL;
bool isFirstMergeNode = pTsmaOptCtx->pScan == pScan;
bool hasWStart = false;
if (nodeType(pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
pWindow = (SWindowLogicNode*)pParent;
pAggFuncs = pWindow->pFuncs;
} else {
pAggFuncs = ((SAggLogicNode*)pParent)->pAggFuncs;
}
if (isFirstMergeNode) {
pAggStateFuncs = nodesCloneList(pAggFuncs);
if (!pAggStateFuncs) return TSDB_CODE_OUT_OF_MEMORY;
} else {
pAggStateFuncs = pAggFuncs;
}
code = fmCreateStateFuncs(pAggStateFuncs);
if (code) return code;
FORBOTH(pStateFuncNode, pAggStateFuncs, pAggFuncNode, pAggFuncs) {
SFunctionNode* pStateFunc = (SFunctionNode*)pStateFuncNode;
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
if (fmIsGroupKeyFunc(pStateFunc->funcId)) {
continue;
} else if (fmIsPseudoColumnFunc(pStateFunc->funcId)) {
if (pStateFunc->funcType == FUNCTION_TYPE_WSTART) hasWStart = true;
continue;
}
pColNode = (SColumnNode*)pScan->pScanCols->pHead->pNode;
pColNode->node.resType = pStateFunc->node.resType;
nodesDestroyList(pAggFunc->pParameterList);
code = nodesListMakeStrictAppend(&pAggFunc->pParameterList, nodesCloneNode((SNode*)pColNode));
if (code) break;
}
if (code == TSDB_CODE_SUCCESS) code = fmCreateStateMergeFuncs(pAggFuncs);
if (pAggFuncs != pAggStateFuncs) nodesDestroyList(pAggStateFuncs);
if (code == TSDB_CODE_SUCCESS && pWindow) {
SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode;
assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
nodesDestroyNode(pWindow->pTspk);
pWindow->pTspk = nodesCloneNode((SNode*)pCol);
if (!hasWStart && !isFirstMergeNode) {
SFunctionNode* pWStart = NULL;
code = tsmaOptCreateWStart(pWindow->node.precision, &pWStart);
if (TSDB_CODE_SUCCESS == code) {
nodesListAppend(pAggFuncs, (SNode*)pWStart);
}
}
}
if (code == TSDB_CODE_SUCCESS && pWindow) {
nodesDestroyList(pWindow->node.pTargets);
code = createColumnByRewriteExprs(pAggFuncs, &pWindow->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
return code;
}
static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
int32_t code = 0;
const STSMAOptUsefulTsma* pTsma = NULL;
SNodeList* pAggFuncs = NULL;
for (int32_t i = 1; i < pTsmaOptCtx->pUsedTsmas->size && code == TSDB_CODE_SUCCESS; ++i) {
pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (!pSubplan) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
// TODO what is 1?
//pSubplan->splitFlag = 1;
pTsmaOptCtx->generatedSubPlans[i - 1] = pSubplan;
SLogicNode* pParent = (SLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pParent);
if (!pParent) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
pSubplan->pNode = pParent;
pParent->pParent = NULL;
SScanLogicNode* pScan = (SScanLogicNode*)pParent->pChildren->pHead->pNode;
code = tsmaOptRewriteScan(pTsmaOptCtx, pScan, pTsma);
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
code = tsmaOptRevisePlan(pTsmaOptCtx, pParent, pScan);
}
}
static void tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) {
STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i);
tsmaOptRewriteScan(pTsmaOptCtx, pTsma);
ENodeType parentType = nodeType(pTsmaOptCtx->pParent);
if (QUERY_NODE_LOGIC_PLAN_WINDOW == parentType) {
tsmaOptRewriteWindow(pTsmaOptCtx, pTsma);
} else if (parentType == QUERY_NODE_LOGIC_PLAN_AGG) {
} else {
ASSERT(0);
if (code == TSDB_CODE_SUCCESS) {
pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, 0);
code = tsmaOptRewriteScan(pTsmaOptCtx, pTsmaOptCtx->pScan, pTsma);
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
code = tsmaOptRevisePlan(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan);
}
//tsmaOptRewriteAggFuncs(pTsmaOptCtx, pTsma);
}
return code;
}
static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
@ -6225,13 +6365,21 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
if (code == TSDB_CODE_SUCCESS && tsmaOptCtx.pUsefulTsmas->size > 0) {
// 2. sort useful tsmas with interval
taosArraySort(tsmaOptCtx.pUsefulTsmas, tsmaInfoCompWithIntervalDesc);
// 3. generate and replace logic plans
// a. split windows
tsmaOptSplitWindows(&tsmaOptCtx, tsmaOptCtx.pTimeRange, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, 0);
// b. create logic plan
tsmaOptGeneratePlan(&tsmaOptCtx);
// c. rewrite agg funcs
tsmaOptRewriteAggFuncs(&tsmaOptCtx);
// 3. split windows
tsmaOptSplitWindows(&tsmaOptCtx, tsmaOptCtx.pTimeRange, 0);
// 4. create logic plan
code = tsmaOptGeneratePlan(&tsmaOptCtx);
if (TSDB_CODE_SUCCESS == code) {
for (int32_t i = 0; i < 2; i++) {
SLogicSubplan* pSubplan = tsmaOptCtx.generatedSubPlans[i];
if (!pSubplan) continue;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->id.groupId = pLogicSubplan->id.groupId + 1;
pSubplan->id.queryId = pLogicSubplan->id.queryId;
nodesListMakeAppend(&pLogicSubplan->pChildren, (SNode*)pSubplan);
}
}
}
}
clearTSMAOptCtx(&tsmaOptCtx);

View File

@ -2282,13 +2282,13 @@ static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
return code;
}
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge, int32_t idx) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcStartGroupId = pMerge->srcGroupId;
pExchange->srcEndGroupId = pMerge->srcGroupId;
pExchange->srcStartGroupId = pMerge->srcGroupId + idx;
pExchange->srcEndGroupId = pMerge->srcGroupId + idx;
pExchange->singleChannel = true;
pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
@ -2319,6 +2319,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
pMerge->srcEndGroupId = pMergeLogicNode->srcEndGroupId;
pMerge->groupSort = pMergeLogicNode->groupSort;
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
@ -2327,13 +2328,15 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre
code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
for (int32_t j = 0; j < pMergeLogicNode->numOfSubplans; ++j) {
for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
code = createExchangePhysiNodeByMerge(pMerge);
code = createExchangePhysiNodeByMerge(pMerge, 0);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,

View File

@ -725,10 +725,27 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
nodesDestroyList(pMergeKeys);
}
}
if (code == TSDB_CODE_SUCCESS) {
SNode* pNode;
FOREACH(pNode, pInfo->pSubplan->pChildren) {
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
//pMerge->numOfChannels += stbSplGetNumOfVgroups(pSubplan->pNode);
pSubplan->id.groupId = pCxt->groupId;
pSubplan->id.queryId = pCxt->queryId;
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
TSWAP(((SScanLogicNode*)pSubplan->pNode->pChildren->pHead->pNode)->pVgroupList, pSubplan->pVgroupList);
//++(pCxt->groupId);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
}
if (TSDB_CODE_SUCCESS == code) {
((SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode)->srcEndGroupId = pCxt->groupId;
((SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode)->numOfSubplans = pInfo->pSubplan->pChildren->length;
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;