diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 24dfa5958d..72232610d3 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -215,7 +215,7 @@ size_t blockDataGetSerialMetaSize(uint32_t numOfCols); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index c0c3fc7fbc..60e8f63c22 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -241,13 +241,6 @@ struct STag { memcpy(varDataVal(x), (str), __len); \ } while (0); -#define STR_TO_NET_VARSTR(x, str) \ - do { \ - VarDataLenT __len = (VarDataLenT)strlen(str); \ - *(VarDataLenT *)(x) = htons(__len); \ - memcpy(varDataVal(x), (str), __len); \ - } while (0); - #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ do { \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 72248336f7..6f2a675466 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -182,7 +182,7 @@ struct SScalarParam { }; void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); -int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock); +//int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock); bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry); diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index a50af18b44..81f63537e5 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -224,7 +224,7 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc typedef enum EFuncDataRequired { FUNC_DATA_REQUIRED_DATA_LOAD = 1, - FUNC_DATA_REQUIRED_STATIS_LOAD, + FUNC_DATA_REQUIRED_SMA_LOAD, FUNC_DATA_REQUIRED_NOT_LOAD, FUNC_DATA_REQUIRED_FILTEROUT, } EFuncDataRequired; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0b27d9b527..6a492a30c6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1143,7 +1143,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } } -static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) { +static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows); if (numOfRows < pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; @@ -1182,8 +1182,10 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* return TSDB_CODE_OUT_OF_MEMORY; } - memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows)); pColumn->pData = tmp; + if (clearPayload) { + memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows)); + } } return TSDB_CODE_SUCCESS; @@ -1203,9 +1205,9 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { } } -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) { SDataBlockInfo info = {0}; - return doEnsureCapacity(pColumn, &info, numOfRows); + return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload); } int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { @@ -1221,7 +1223,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - code = doEnsureCapacity(p, &pDataBlock->info, numOfRows); + code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false); if (code) { return code; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index afd53b8dda..6ae52f1fd7 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -644,6 +644,13 @@ typedef struct SSttBlockLoadInfo { int16_t *colIds; int32_t numOfCols; bool sttBlockLoaded; + + // keep the last access position, this position may be used to reduce the binary times for + // starting last block data for a new table + struct { + int32_t blockIndex; + int32_t rowIndex; + } prevEndPos; } SSttBlockLoadInfo; typedef struct SMergeTree { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 14159c2da2..e20c8ee955 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -536,10 +536,10 @@ typedef struct SSysTableScanInfo { } SSysTableScanInfo; typedef struct SBlockDistInfo { - SSDataBlock* pResBlock; - STsdbReader* pHandle; - SReadHandle readHandle; - uint64_t uid; // table uid + SSDataBlock* pResBlock; + STsdbReader* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this @@ -550,7 +550,6 @@ typedef struct SOptrBasicInfo { } SOptrBasicInfo; typedef struct SIntervalAggOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter SExprSupp scalarSupp; // supporter for perform scalar function @@ -571,7 +570,6 @@ typedef struct SIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo* intervalAggOperatorInfo; - // bool hasGroupId; uint64_t groupId; // current groupId int64_t curTs; // current ts SSDataBlock* prefetchedBlock; @@ -839,10 +837,6 @@ typedef struct SSortOperatorInfo { SNode* pCondition; } SSortOperatorInfo; -typedef struct STagFilterOperatorInfo { - SOptrBasicInfo binfo; -} STagFilterOperatorInfo; - typedef struct SJoinOperatorInfo { SSDataBlock* pRes; int32_t joinType; @@ -907,7 +901,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); -void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); +void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts); int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index bcd59e83f0..95d3c5cf23 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -170,14 +170,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); pRes->info.rows = 1; - if (pInfo->pseudoExprSup.numOfExprs > 0) { - SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows, - GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return NULL; - } + SExprSupp* pSup = &pInfo->pseudoExprSup; + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, + pRes->info.rows, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; } pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 96a493f101..aa6df1ca1a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -379,7 +379,7 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true); if (code != TSDB_CODE_SUCCESS) { terrno = code; taosMemoryFree(pColumnData); @@ -1828,8 +1828,6 @@ static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { } static int32_t sortTableGroup(STableListInfo* pTableListInfo) { - int32_t code = TSDB_CODE_SUCCESS; - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); int32_t size = taosArrayGetSize(pTableListInfo->pTableList); @@ -1851,6 +1849,11 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + if (pTableListInfo->groupOffset == NULL) { + taosArrayDestroy(pList); + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); taosArrayDestroy(pList); return TDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 26abc2b90b..deb628f30e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -87,7 +87,7 @@ static void releaseQueryBuf(size_t numOfTables); static void destroyFillOperatorInfo(void* param); static void destroyProjectOperatorInfo(void* param); -static void destroyOrderOperatorInfo(void* param); +static void destroySortOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param); static void destroyIntervalOperatorInfo(void* param); @@ -322,7 +322,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; pColData->info.bytes = sizeof(int64_t); - colInfoDataEnsureCapacity(pColData, 5); + colInfoDataEnsureCapacity(pColData, 5, false); colDataAppendInt64(pColData, 0, &pQueryWindow->skey); colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); @@ -439,7 +439,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc pColInfo = pInput->pData[paramIndex]; } - colInfoDataEnsureCapacity(pColInfo, numOfRows); + colInfoDataEnsureCapacity(pColInfo, numOfRows, false); int8_t type = pFuncParam->param.nType; if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { @@ -1047,8 +1047,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pCol SFilterInfo* filter = pFilterInfo; int64_t st = taosGetTimestampUs(); - // pError("start filter"); - // todo move to the initialization function int32_t code = 0; bool needFree = false; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3352b2685a..be912c9f11 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -58,6 +58,18 @@ typedef struct { void* pVnode; } SSTabFltArg; +typedef struct STableMergeScanExecInfo { + SFileBlockLoadRecorder blockRecorder; + SSortExecInfo sortExecInfo; +} STableMergeScanExecInfo; + +typedef struct STableMergeScanSortSourceParam { + SOperatorInfo* pOperator; + int32_t readerIdx; + uint64_t uid; + SSDataBlock* inputBlock; +} STableMergeScanSortSourceParam; + static int32_t sysChkFilter__Comm(SNode* pNode); static int32_t sysChkFilter__DBName(SNode* pNode); static int32_t sysChkFilter__VgroupId(SNode* pNode); @@ -69,15 +81,15 @@ static int32_t sysChkFilter__STableName(SNode* pNode); static int32_t sysChkFilter__Uid(SNode* pNode); static int32_t sysChkFilter__Type(SNode* pNode); -static int32_t sysFilte__DbName(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__TableName(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__CreateTime(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Ncolumn(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Ttl(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__STableName(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Uid(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Type(void* pMeta, SNode* pNode, SArray* result); +static int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Ttl(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__STableName(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Uid(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Type(void* arg, SNode* pNode, SArray* result); const SSTabFltFuncDef filterDict[] = { {.name = "table_name", .chkFunc = sysChkFilter__TableName, .fltFunc = sysFilte__TableName}, @@ -95,7 +107,7 @@ const SSTabFltFuncDef filterDict[] = { static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result); static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result); static int32_t optSysCheckOper(SNode* pOpear); -static int32_t optSysMergeRslt(SArray* multiRslt, SArray* reslt); +static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt); static bool processBlockWithProbability(const SSampleExecInfo* pInfo); @@ -123,29 +135,6 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } } -static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { -#if 0 - int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); - SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); - - size_t t = taosArrayGetSize(group); - for (int32_t j = 0; j < t; ++j) { - STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); - updateTableQueryInfoForReverseScan(pCheckInfo); - - // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide - // the start check timestamp of tsdbQueryHandle -// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j); -// pTableKeyInfo->lastKey = pCheckInfo->lastKey; -// -// assert(pCheckInfo->pTable == pTableKeyInfo->pTable); - } - } -#endif -} - static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') { @@ -291,7 +280,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return TSDB_CODE_SUCCESS; } -static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, +static bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, int32_t numOfRows) { if (pColsAgg == NULL || pFilterNode == NULL) { return true; @@ -360,13 +349,13 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo // todo handle the slimit info void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { SLimit* pLimit = &pLimitInfo->limit; + const char* id = GET_TASKID(pTaskInfo); if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; pBlock->info.rows = 0; - qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, - GET_TASKID(pTaskInfo)); + qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id); } else { blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; @@ -379,7 +368,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo int32_t keep = pBlock->info.rows - overflowRows; blockDataKeepFirstNRows(pBlock, keep); - qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); pOperator->status = OP_EXEC_DONE; } } @@ -429,10 +418,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (pTableScanInfo->pseudoSup.numOfExprs > 0) { ensureBlockCapacity(pBlock, pBlock->info.rows); } + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; loadSMA = true; // mark the operation of load sma; bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); @@ -536,7 +526,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr) { // currently only the tbname pseudo column - if (numOfPseudoExpr == 0) { + if (numOfPseudoExpr <= 0) { return TSDB_CODE_SUCCESS; } @@ -566,7 +556,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int // this is to handle the tbname if (fmIsScanPseudoColumnFunc(functionId)) { - setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId); + setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name); } else { // these are tags STagVal tagVal = {0}; tagVal.cid = pExpr->base.pParam[0].pCol->colId; @@ -602,16 +592,20 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int return TSDB_CODE_SUCCESS; } -void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) { +void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) { struct SScalarFuncExecFuncs fpSet = {0}; fmGetScalarFuncExecFuncs(functionId, &fpSet); - SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1); - colInfoDataEnsureCapacity(&infoData, 1); + size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE; + char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(buf, name) - colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid); - SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; + SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1); + colInfoDataEnsureCapacity(&infoData, 1, false); + colDataAppend(&infoData, 0, buf, false); + + SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData}; SScalarParam param = {.columnData = pColInfoData}; if (fpSet.process != NULL) { @@ -679,7 +673,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { return NULL; } -static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { +static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -746,7 +740,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); while (1) { - SSDataBlock* result = doTableScanGroup(pOperator); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result) { return result; } @@ -784,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } - SSDataBlock* result = doTableScanGroup(pOperator); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { ASSERT(result->info.uid != 0); return result; @@ -808,7 +802,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; - result = doTableScanGroup(pOperator); + result = doGroupedTableScan(pOperator); if (result != NULL) { return result; } @@ -851,24 +845,25 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; + SScanPhysiNode* pScanNode = &pTableScanNode->scan; + SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, + int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); + initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } - if (pTableScanNode->scan.pScanPseudoCols != NULL) { + if (pScanNode->pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->pseudoSup; - pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } @@ -880,7 +875,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->pFilterNode = pScanNode->node.pConditions; if (pInfo->pFilterNode != NULL) { code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); @@ -988,7 +983,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; - int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, + int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -2430,7 +2425,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->dataReader = NULL; - int32_t code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -4098,12 +4093,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan goto _error; } - SScanPhysiNode* pScanNode = &pScanPhyNode->scan; - + SScanPhysiNode* pScanNode = &pScanPhyNode->scan; SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; int32_t num = 0; int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosMemoryStrDup((void*)pUser); @@ -4140,7 +4137,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan return pOperator; _error: - taosMemoryFreeClear(pInfo); + if (pInfo != NULL) { + destroySysScanOperator(pInfo); + } taosMemoryFreeClear(pOperator); terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; @@ -4244,16 +4243,15 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; - int32_t num = 0; int32_t numOfExprs = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); - int32_t code = - extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + int32_t num = 0; + code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4285,21 +4283,9 @@ _error: return NULL; } -int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, - int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { - for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { - STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i); - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr); - taosArrayPush(arrayReader, &pReader); - } - - return TSDB_CODE_SUCCESS; -} - // todo refactor static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, - int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { + SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableMergeScanInfo* pInfo = pOperator->info; @@ -4334,12 +4320,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; - // STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); if (allColumnsHaveAgg == true) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -4388,13 +4373,12 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); // currently only the tbname pseudo column - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = - addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + SExprSupp* pSup = &pTableScanInfo->pseudoSup; + + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, + pBlock->info.rows, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); } if (pTableScanInfo->pFilterNode != NULL) { @@ -4416,13 +4400,6 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc return TSDB_CODE_SUCCESS; } -typedef struct STableMergeScanSortSourceParam { - SOperatorInfo* pOperator; - int32_t readerIdx; - uint64_t uid; - SSDataBlock* inputBlock; -} STableMergeScanSortSourceParam; - static SSDataBlock* getTableDataBlockImpl(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -4433,7 +4410,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { STableMergeScanInfo* pTableScanInfo = pOperator->info; SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); - blockDataCleanup(pBlock); int64_t st = taosGetTimestampUs(); @@ -4443,13 +4419,13 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); if (code != 0) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } STsdbReader* reader = pInfo->pReader; while (tsdbNextDataBlock(reader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + if (isTaskKilled(pTaskInfo)) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -4472,9 +4448,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } uint32_t status = 0; - int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); + code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // current block is filter out according to filter condition, continue load the next block @@ -4482,71 +4458,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); - pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; + pOperator->resultInfo.totalRows += pBlock->info.rows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; tsdbReaderClose(pInfo->pReader); pInfo->pReader = NULL; return pBlock; } + tsdbReaderClose(pInfo->pReader); pInfo->pReader = NULL; return NULL; } -static SSDataBlock* getTableDataBlock(void* param) { - STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - int32_t readerIdx = source->readerIdx; - SSDataBlock* pBlock = source->inputBlock; - STableMergeScanInfo* pTableScanInfo = pOperator->info; - - int64_t st = taosGetTimestampUs(); - - blockDataCleanup(pBlock); - - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); - while (tsdbNextDataBlock(reader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); - if (!processThisBlock) { - continue; - } - - blockDataCleanup(pBlock); - - int32_t rows = 0; - tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); - blockDataEnsureCapacity(pBlock, rows); - pBlock->info.rows = rows; - - uint32_t status = 0; - int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - - pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - return pBlock; - } - return NULL; -} - SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { @@ -4781,11 +4707,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -typedef struct STableMergeScanExecInfo { - SFileBlockLoadRecorder blockRecorder; - SSortExecInfo sortExecInfo; -} STableMergeScanExecInfo; - int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { ASSERT(pOptr != NULL); // TODO: merge these two info into one struct @@ -4800,18 +4721,6 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla return TSDB_CODE_SUCCESS; } -int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { - const STableKeyInfo* info1 = p1; - const STableKeyInfo* info2 = p2; - if (info1->groupId < info2->groupId) { - return -1; - } else if (info1->groupId > info2->groupId) { - return 1; - } else { - return 0; - } -} - SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); @@ -4825,6 +4734,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN int32_t numOfCols = 0; int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 26f1932b12..b2c926f1e9 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator); static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -static void destroyOrderOperatorInfo(void* param); +static void destroySortOperatorInfo(void* param); // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { @@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroySortOperatorInfo, getExplainExecInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -249,7 +249,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; } -void destroyOrderOperatorInfo(void* param) { +void destroySortOperatorInfo(void* param) { SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); @@ -729,12 +729,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = createResDataBlock(pDescNode); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; ASSERT(rowSize < 100 * 1024 * 1024); - SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { @@ -746,7 +745,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; - pInfo->pSortInfo = pSortInfo; + pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); pInfo->pInputBlock = pInputBlock; pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. @@ -768,11 +767,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; if (pInfo != NULL) { destroyMultiwayMergeOperatorInfo(pInfo); } + pTaskInfo->code = code; taosMemoryFree(pOperator); return NULL; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c8f0b3d826..4c0cb53ce7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -489,7 +489,7 @@ EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWind if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) { return FUNC_DATA_REQUIRED_NOT_LOAD; } - return FUNC_DATA_REQUIRED_STATIS_LOAD; + return FUNC_DATA_REQUIRED_SMA_LOAD; } bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { @@ -1103,7 +1103,7 @@ int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { - return FUNC_DATA_REQUIRED_STATIS_LOAD; + return FUNC_DATA_REQUIRED_SMA_LOAD; } bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0d65e8d8f5..79f33f3ac3 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -279,7 +279,7 @@ static EFuncDataRequired scanPathOptPromoteDataRequired(EFuncDataRequired l, EFu switch (l) { case FUNC_DATA_REQUIRED_DATA_LOAD: return l; - case FUNC_DATA_REQUIRED_STATIS_LOAD: + case FUNC_DATA_REQUIRED_SMA_LOAD: return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l; case FUNC_DATA_REQUIRED_NOT_LOAD: return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r; diff --git a/source/libs/scalar/CMakeLists.txt b/source/libs/scalar/CMakeLists.txt index c34c5e2877..193a6971e5 100644 --- a/source/libs/scalar/CMakeLists.txt +++ b/source/libs/scalar/CMakeLists.txt @@ -14,7 +14,6 @@ target_link_libraries(scalar PRIVATE nodes PRIVATE function PRIVATE qcom - PRIVATE vnode ) if(${BUILD_TEST}) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index ea1ce175e3..fa71009365 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -49,7 +49,7 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pColumnData); @@ -70,7 +70,7 @@ int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); - colInfoDataEnsureCapacity(out->columnData, 1); + colInfoDataEnsureCapacity(out->columnData, 1, true); code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1); sclFreeParam(&in); @@ -88,7 +88,7 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL pLeft->numOfRows = pb->info.rows; if (pDst->numOfRows < pb->info.rows) { - colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows); + colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true); } _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN); @@ -1604,7 +1604,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { if (1 == res->numOfRows) { SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); } else { - colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); + colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true); colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); pDst->numOfRows = res->numOfRows; pDst->numOfQualified = res->numOfQualified; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 7e93c9173c..c89072233f 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -6,7 +6,6 @@ #include "tdatablock.h" #include "tjson.h" #include "ttime.h" -#include "vnode.h" typedef float (*_float_fn)(float); typedef double (*_double_fn)(double); @@ -1718,12 +1717,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { ASSERT(inputNum == 1); + char* p = colDataGetVarData(pInput->columnData, 0); - uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0); - - char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - metaGetTableNameByUid(pInput->param, uid, str); - colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows); + colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows); pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; }