From 503300a2bd99fbc3904772fab163c1dc7268ce77 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Mar 2022 14:21:14 +0800 Subject: [PATCH] [td-13039] update test. --- include/libs/function/function.h | 2 +- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executorimpl.c | 103 +++++++++++++----------- source/libs/function/inc/tfill.h | 2 +- source/libs/function/src/tfill.c | 4 +- 5 files changed, 63 insertions(+), 51 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e33805437a..6c4774c3be 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -327,7 +327,7 @@ bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, - struct SFillColInfo* pFillCol, void* handle); + struct SFillColInfo* pFillCol, const char* id); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 819c0a74f7..04e24dcc79 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -659,7 +659,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, + int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1b555e396b..4400351b50 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7083,31 +7083,32 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou return pInfo->binfo.pRes; } -static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, STaskRuntimeEnv* pRuntimeEnv, bool* newgroup) { +static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; + +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); - taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; } -static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRuntimeEnv *pRuntimeEnv, bool *newgroup) { +static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) { + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p); + if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) { return; } } // handle the cached new group data block if (pInfo->existNewGroupBlock) { - doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); +// doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup); } } @@ -7120,10 +7121,10 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { return NULL; } -// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); -// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { -// return pInfo->pRes; -// } + doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup); + if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { + return pInfo->pRes; + } SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while(1) { @@ -7827,7 +7828,33 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo) { +static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, + STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { + struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal); + + TSKEY sk = TMIN(win.skey, win.ekey); + TSKEY ek = TMAX(win.skey, win.ekey); + + // TODO set correct time precision + STimeWindow w = TSWINDOW_INITIALIZER; + getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w); + + int32_t order = TSDB_ORDER_ASC; + pInfo->pFillInfo = + taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding, + pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id); + + pInfo->p = calloc(numOfCols, POINTER_BYTES); + + if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } else { + return TSDB_CODE_SUCCESS; + } +} + +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, + int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7836,23 +7863,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pInfo->intervalInfo = *pInterval; SResultInfo* pResultInfo = &pOperator->resultInfo; - { -// struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pQueryAttr->fillVal); - STimeWindow w = TSWINDOW_INITIALIZER; - - TSKEY sk = TMIN(pTaskInfo->window.skey, pTaskInfo->window.ekey); - TSKEY ek = TMAX(pTaskInfo->window.skey, pTaskInfo->window.ekey); - getAlignQueryTimeWindow(pInterval, pInterval->precision, pTaskInfo->window.skey, sk, ek, &w); - - int32_t order = TSDB_ORDER_ASC; - -// pInfo->pFillInfo = -// taosCreateFillInfo(order, w.skey, 0, (int32_t)pResultInfo->capacity, numOfCols, -// pInterval->sliding, pInterval->slidingUnit, -// (int8_t)pInterval->precision, pQueryAttr->fillType, pColInfo, pTaskInfo->id.str); - - pInfo->p = calloc(numOfCols, POINTER_BYTES); - } +// int32_t code = initFillInfo(pInfo, pExpr, numOfCols, fillVal, , pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType); +// if (code != TSDB_CODE_SUCCESS) { +// goto _error; +// } pOperator->name = "FillOperator"; pOperator->blockingOptr = false; @@ -7869,6 +7883,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + tfree(pOperator); + tfree(pInfo); + return NULL; } SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { @@ -8377,7 +8396,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { return pTaskInfo; } -static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId); +static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); @@ -8385,16 +8404,12 @@ static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { -// if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node -// pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); -// } - if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); - tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); + tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; @@ -8403,10 +8418,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. - STableGroupInfo groupInfo = {0}; - - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); - SArray* tableIdList = extractTableIdList(&groupInfo); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); @@ -8599,22 +8612,20 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { return tableIdList; } -tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { - STableGroupInfo groupInfo = {0}; - +tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo *pTableGroupInfo, uint64_t queryId, uint64_t taskId) { uint64_t uid = pTableScanNode->scan.uid; - int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, &groupInfo, queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId); if (code != TSDB_CODE_SUCCESS) { goto _error; } - if (groupInfo.numOfTables == 0) { + if (pTableGroupInfo->numOfTables == 0) { code = 0; qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId); goto _error; } - return createDataReaderImpl(pTableScanNode, &groupInfo, pHandle->reader, queryId, taskId); + return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId); _error: terrno = code; diff --git a/source/libs/function/inc/tfill.h b/source/libs/function/inc/tfill.h index 81348fba1d..b90dbf7799 100644 --- a/source/libs/function/inc/tfill.h +++ b/source/libs/function/inc/tfill.h @@ -61,7 +61,7 @@ typedef struct SFillInfo { SFillColInfo* pFillCol; // column info for fill operations SFillTagColInfo* pTags; // tags value for filling gap - void* handle; // for debug purpose + const char* id; } SFillInfo; int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); diff --git a/source/libs/function/src/tfill.c b/source/libs/function/src/tfill.c index 46d82aa6fb..9b3dca7393 100644 --- a/source/libs/function/src/tfill.c +++ b/source/libs/function/src/tfill.c @@ -342,7 +342,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, - struct SFillColInfo* pCol, void* handle) { + struct SFillColInfo* pCol, const char* id) { if (fillType == TSDB_FILL_NONE) { return NULL; } @@ -357,7 +357,7 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag pFillInfo->numOfCols = numOfCols; pFillInfo->precision = precision; pFillInfo->alloc = capacity; - pFillInfo->handle = handle; + pFillInfo->id = id; pFillInfo->interval.interval = slidingTime; pFillInfo->interval.intervalUnit = slidingUnit;