diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 94a87a018e..67c9db13ed 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -175,7 +175,6 @@ struct SExecTaskInfo { int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; -// STableListInfo* pTableInfoList; // this is a table list const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -324,7 +323,7 @@ typedef struct STableScanBase { int32_t dataBlockLoadFlag; SLimitInfo limitInfo; // there are more than one table list exists in one task, if only one vnode exists. - STableListInfo* pTableInfoList; + STableListInfo* pTableListInfo; } STableScanBase; typedef struct STableScanInfo { @@ -370,7 +369,7 @@ typedef struct STagScanInfo { SColMatchInfo matchInfo; int32_t curPos; SReadHandle readHandle; - STableListInfo* pTableInfoList; + STableListInfo* pTableListInfo; } STagScanInfo; typedef enum EStreamScanMode { @@ -518,6 +517,16 @@ typedef struct SOptrBasicInfo { bool mergeResultBlock; } SOptrBasicInfo; +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + STableQueryInfo* current; + uint64_t groupId; + SGroupResInfo groupResInfo; + SExprSupp scalarExprSup; + bool groupKeyOptimized; +} SAggOperatorInfo; + typedef struct SIntervalAggOperatorInfo { SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter @@ -734,6 +743,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3 STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); +SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); @@ -837,9 +847,11 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); char* buildTaskId(uint64_t taskId, uint64_t queryId); +SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); + int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, @@ -853,7 +865,6 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); -bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c8fce05341..b05d01f596 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -404,7 +404,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableInfoList; + STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo; taosWLockLatch(&pTaskInfo->lock); for (int32_t i = 0; i < numOfQualifiedTables; ++i) { @@ -485,9 +485,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) { - assert(pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - taosThreadOnce(&initPoolOnce, initRefPool); qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); @@ -507,7 +505,12 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, if (handle) { void* pSinkParam = NULL; - code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); + + SArray* pInfoList = getTableListInfo(*pTask); + STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0); + taosArrayDestroy(pInfoList); + + code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTableListInfo, readHandle); if (code != TSDB_CODE_SUCCESS) { qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str); goto _error; @@ -1083,7 +1086,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - STableListInfo* pTableListInfo = NULL;//pTaskInfo->pTableInfoList; const char* id = GET_TASKID(pTaskInfo); pTaskInfo->streamInfo.prepareStatus = *pOffset; @@ -1095,19 +1097,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (subType == TOPIC_SUB_TYPE__COLUMN) { pOperator->status = OP_OPENED; - - // TODO add more check - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - if (pOperator->numOfDownstream != 1) { - qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); - return -1; - } - pOperator = pOperator->pDownstream[0]; - } + pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id); SStreamScanInfo* pInfo = pOperator->info; STableScanInfo* pScanInfo = pInfo->pTableScanOp->info; STableScanBase* pScanBaseInfo = &pScanInfo->base; + STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { tsdbReaderClose(pScanBaseInfo->dataReader); @@ -1191,9 +1186,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; + + SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id); + STableListInfo* pTableListInfo = ((STableScanInfo*)(p->info))->base.pTableListInfo; if (setForSnapShot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id); return -1; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e29432b808..265f948bcd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -74,25 +74,12 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #endif #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) -#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) - -typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - STableQueryInfo* current; - uint64_t groupId; - SGroupResInfo groupResInfo; - SExprSupp scalarExprSup; - bool groupKeyOptimized; -} SAggOperatorInfo; static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void releaseQueryBuf(size_t numOfTables); -static void destroyAggOperatorInfo(void* param); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); @@ -266,57 +253,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR return pResult; } -// a new buffer page for each table. Needs to opt this design -static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { - if (pWindowRes->pageId != -1) { - return 0; - } - - SFilePage* pData = NULL; - - // in the first scan, new space needed for results - int32_t pageId = -1; - SArray* list = getDataBufPagesIdList(pResultBuf); - - if (taosArrayGetSize(list) == 0) { - pData = getNewBufPage(pResultBuf, &pageId); - pData->num = sizeof(SFilePage); - } else { - SPageInfo* pi = getLastPageInfo(list); - pData = getBufPage(pResultBuf, getPageId(pi)); - if (pData == NULL) { - qError("failed to get buffer, code:%s", tstrerror(terrno)); - return terrno; - } - - pageId = getPageId(pi); - - if (pData->num + size > getBufPageSize(pResultBuf)) { - // release current page first, and prepare the next one - releaseBufPageInfo(pResultBuf, pi); - - pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); - } - } - } - - if (pData == NULL) { - return -1; - } - - // set the number of rows in current disk page - if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer - pWindowRes->pageId = pageId; - pWindowRes->offset = (int32_t)pData->num; - - pData->num += size; - } - - return 0; -} - // query_range_start, query_range_end, window_duration, window_start, window_end void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) { pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; @@ -512,25 +448,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int return code; } -static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { - for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { - if (functionNeedToExecute(&pCtx[k])) { - // todo add a dummy funtion to avoid process check - if (pCtx[k].fpSet.process == NULL) { - continue; - } - - int32_t code = pCtx[k].fpSet.process(&pCtx[k]); - if (code != TSDB_CODE_SUCCESS) { - qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); - return code; - } - } - } - - return TSDB_CODE_SUCCESS; -} - bool functionNeedToExecute(SqlFunctionCtx* pCtx) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -654,149 +571,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int return win; } -int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, - uint32_t* status) { - *status = BLK_DATA_NOT_LOAD; - - pBlock->pDataBlock = NULL; - pBlock->pBlockAgg = NULL; - - // int64_t groupId = pRuntimeEnv->current->groupIndex; - // bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - - STaskCostInfo* pCost = &pTaskInfo->cost; - -// pCost->totalBlocks += 1; -// pCost->totalRows += pBlock->info.rows; -#if 0 - // Calculate all time windows that are overlapping or contain current data block. - // If current data block is contained by all possible time window, do not load current data block. - if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 || - (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) { - (*status) = BLK_DATA_DATA_LOAD; - } - - // check if this data block is required to load - if ((*status) != BLK_DATA_DATA_LOAD) { - bool needFilter = true; - - // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, - // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - SResultRow* pResult = NULL; - - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; - - STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (pQueryAttr->pointInterpQuery) { - needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowEntryInfoOffset); - } else { - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY); - } - } - } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate - doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, - pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput, - pRuntimeEnv->current->groupIndex); - } - - if (needFilter) { - (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); - } else { - (*status) = BLK_DATA_DATA_LOAD; - } - } - - SDataBlockInfo* pBlockInfo = &pBlock->info; -// *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); - - if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) { - //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, -// pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->skipBlocks += 1; - } else if ((*status) == BLK_DATA_SMA_LOAD) { - // this function never returns error? - pCost->loadBlockStatis += 1; -// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); - - if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block -// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); - pCost->totalCheckedRows += pBlock->info.rows; - } - } else { - assert((*status) == BLK_DATA_DATA_LOAD); - - // load the data block statistics to perform further filter - pCost->loadBlockStatis += 1; -// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); - - if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) { - { // set previous window - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - SResultRow* pResult = NULL; - - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; - - STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY); - } - } - } - bool load = false; - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - int32_t functionId = pTableScanInfo->pCtx[i].functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) { -// load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min), -// (char*)&(pBlock->pBlockAgg[i].max)); - if (!load) { // current block has been discard due to filter applied - pCost->skipBlocks += 1; - //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, -// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = BLK_DATA_FILTEROUT; - return TSDB_CODE_SUCCESS; - } - } - } - } - - // current block has been discard due to filter applied -// if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) { -// pCost->skipBlocks += 1; -// qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, -// pBlockInfo->window.ekey, pBlockInfo->rows); -// (*status) = BLK_DATA_FILTEROUT; -// return TSDB_CODE_SUCCESS; -// } - - pCost->totalCheckedRows += pBlockInfo->rows; - pCost->loadBlocks += 1; -// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); -// if (pBlock->pDataBlock == NULL) { -// return terrno; -// } - -// if (pQueryAttr->pFilters != NULL) { -// filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock); -// } - -// if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) { -// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); -// } - } -#endif - return TSDB_CODE_SUCCESS; -} - void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { if (status == TASK_NOT_COMPLETED) { pTaskInfo->status = status; @@ -1027,43 +801,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { - // for simple group by query without interval, all the tables belong to one group result. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SAggOperatorInfo* pAggInfo = pOperator->info; - - SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; - - SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, - sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); - /* - * not assign result buffer yet, add new result buffer - * all group belong to one result set, and each group result has different group id so set the id to be one - */ - if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } - } - - setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); -} - -static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { - SAggOperatorInfo* pAggInfo = pOperator->info; - if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { - return; - } - - doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); - - // record the current active group id - pAggInfo->groupId = groupId; -} - void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { @@ -1481,191 +1218,23 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } -static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { - if (!tsCountAlwaysReturnValue) { - return TSDB_CODE_SUCCESS; - } - - SAggOperatorInfo* pAggInfo = pOperator->info; - if (pAggInfo->groupKeyOptimized) { - return TSDB_CODE_SUCCESS; - } - - SOperatorInfo* downstream = pOperator->pDownstream[0]; - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || - (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && - ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { - return TSDB_CODE_SUCCESS; - } - - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - bool hasCountFunc = false; - - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; - if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) || - (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) { - hasCountFunc = true; - break; - } - } - - if (!hasCountFunc) { - return TSDB_CODE_SUCCESS; - } - - SSDataBlock* pBlock = createDataBlock(); - pBlock->info.rows = 1; - pBlock->info.capacity = 0; - - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SColumnInfoData colInfo = {0}; - colInfo.hasNull = true; - colInfo.info.type = TSDB_DATA_TYPE_NULL; - colInfo.info.bytes = 1; - - SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; - for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { - SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; - if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { - int32_t slotId = pFuncParam->pCol->slotId; - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - if (slotId >= numOfCols) { - taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1); - for (int32_t k = numOfCols; k < slotId + 1; ++k) { - taosArrayPush(pBlock->pDataBlock, &colInfo); - } - } - } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { - // do nothing - } - } - } - - blockDataEnsureCapacity(pBlock, pBlock->info.rows); - *ppBlock = pBlock; - - return TSDB_CODE_SUCCESS; -} - -static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { - if (!blockAllocated) { - return; - } - - blockDataDestroy(*ppBlock); - *ppBlock = NULL; -} - -// this is a blocking operator -static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SAggOperatorInfo* pAggInfo = pOperator->info; - - SExprSupp* pSup = &pOperator->exprSupp; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - int64_t st = taosGetTimestampUs(); - - int32_t order = TSDB_ORDER_ASC; - int32_t scanFlag = MAIN_SCAN; - - bool hasValidBlock = false; - bool blockAllocated = false; - - while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - if (!hasValidBlock) { - createDataBlockForEmptyInput(pOperator, &pBlock); - if (pBlock == NULL) { - break; - } - blockAllocated = true; - } else { - break; - } - } - hasValidBlock = true; - - int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); - if (code != TSDB_CODE_SUCCESS) { - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - T_LONG_JMP(pTaskInfo->env, code); - } - - // there is an scalar expression that needs to be calculated before apply the group aggregation. - if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { - SExprSupp* pSup1 = &pAggInfo->scalarExprSup; - code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL); - if (code != TSDB_CODE_SUCCESS) { - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - T_LONG_JMP(pTaskInfo->env, code); - } - } - - // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); - setInputDataBlock(pSup, pBlock, order, scanFlag, true); - code = doAggregateImpl(pOperator, pSup->pCtx); - if (code != 0) { - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - T_LONG_JMP(pTaskInfo->env, code); - } - - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - } - - // the downstream operator may return with error code, so let's check the code before generating results. - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); - } - - initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); - OPTR_SET_OPENED(pOperator); - - pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - return pTaskInfo->code; -} - -static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { - SAggOperatorInfo* pAggInfo = pOperator->info; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; - - if (pOperator->status == OP_EXEC_DONE) { +//QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN +SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) { + if (pOperator == NULL) { + qError("invalid operator, failed to find tableScanOperator %s", id); return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - setOperatorCompleted(pOperator); - return NULL; - } - - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - - if (!hasRemainResults(&pAggInfo->groupResInfo)) { - setOperatorCompleted(pOperator); - break; + if (pOperator->operatorType == type) { + return pOperator; + } else { + if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { + qError("invalid operator, failed to find tableScanOperator %s", id); + return NULL; } - if (pInfo->pRes->info.rows > 0) { - break; - } + return extractOperatorInTree(pOperator->pDownstream[0], type, id); } - - size_t rows = blockDataGetNumOfRows(pInfo->pRes); - pOperator->resultInfo.totalRows += rows; - - return (rows == 0) ? NULL : pInfo->pRes; } void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { @@ -1814,7 +1383,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) { initResultRowInfo(&pInfo->resultRowInfo); } -void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; } @@ -1866,99 +1435,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, - SExecTaskInfo* pTaskInfo) { - SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } - - SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); - initBasicInfo(&pInfo->binfo, pResBlock); - - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(&pOperator->resultInfo, 4096); - - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; - if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); - } - - code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; - pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; - pInfo->groupId = UINT64_MAX; - - setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, - pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, - optrDefaultBufFn, NULL); - - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { - STableScanInfo* pTableScanInfo = downstream->info; - pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; - pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; - } - - code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - return pOperator; - -_error: - if (pInfo != NULL) { - destroyAggOperatorInfo(pInfo); - } - - if (pOperator != NULL) { - cleanupExprSupp(&pOperator->exprSupp); - } - - taosMemoryFreeClear(pOperator); - pTaskInfo->code = code; - return NULL; -} - void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void freeItem(void* pItem) { - void** p = pItem; - if (*p != NULL) { - taosMemoryFreeClear(*p); - } -} - -void destroyAggOperatorInfo(void* param) { - SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); - - cleanupAggSup(&pInfo->aggSup); - cleanupExprSupp(&pInfo->scalarExprSup); - cleanupGroupResInfo(&pInfo->groupResInfo); - taosMemoryFreeClear(param); -} - char* buildTaskId(uint64_t taskId, uint64_t queryId) { char* p = taosMemoryMalloc(64); @@ -2147,6 +1625,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); return NULL; } @@ -2154,12 +1633,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); return NULL; } pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); return NULL; } @@ -2178,19 +1659,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); return NULL; } - -#ifndef NDEBUG - int32_t sz = tableListGetSize(pTableListInfo); - qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr); - - for (int32_t i = 0; i < sz; i++) { - STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); - qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId); - } -#endif } pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); @@ -2262,7 +1734,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - if (pOperator != NULL) { + if (pOperator != NULL) { // todo moved away pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; } @@ -2403,9 +1875,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { return -1; } -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { - SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; - +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle) { switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam)); @@ -2422,9 +1892,9 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT if (NULL == pDeleterParam) { return TSDB_CODE_OUT_OF_MEMORY; } -#if 0 - int32_t tbNum = tableListGetSize(pTask->pTableInfoList); - pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList); + + int32_t tbNum = tableListGetSize(pTableListInfo); + pDeleterParam->suid = tableListGetSuid(pTableListInfo); // TODO extract uid list pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); @@ -2434,12 +1904,11 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT } for (int32_t i = 0; i < tbNum; ++i) { - STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i); + STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i); taosArrayPush(pDeleterParam->pUidList, &pTable->uid); } *pParam = pDeleterParam; -#endif break; } @@ -2816,3 +2285,21 @@ void qStreamCloseTsdbReader(void* task) { } } } + +void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pScanInfo = pOperator->info; + taosArrayPush(pList, &pScanInfo->base.pTableListInfo); + } else { + if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) { + extractTableList(pList, pOperator->pDownstream[0]); + } + } +} + +SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) { + SArray* pArray = taosArrayInit(0, POINTER_BYTES); + SOperatorInfo* pOperator = pTaskInfo->pRoot; + extractTableList(pArray, pOperator); + return pArray; +} \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4ce9ed0726..14edf9f13c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -676,7 +676,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } if (pBlock->info.id.uid) { - pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } uint32_t status = 0; @@ -771,7 +771,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // scan table one by one sequentially if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList); + int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableListInfo); STableKeyInfo tInfo = {0}; while (1) { @@ -784,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pInfo->currentTable++; taosRLockLatch(&pTaskInfo->lock); - numOfTables = tableListGetSize(pInfo->base.pTableInfoList); + numOfTables = tableListGetSize(pInfo->base.pTableListInfo); if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); @@ -792,7 +792,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableInfoList, pInfo->currentTable); + tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1); @@ -804,14 +804,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } else { // scan table group by group sequentially if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -830,7 +830,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } @@ -841,7 +841,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); tsdbSetTableList(pInfo->base.dataReader, pList, num); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); @@ -876,7 +876,7 @@ static void destroyTableScanBase(STableScanBase* pBase) { taosArrayDestroy(pBase->matchInfo.pList); } - tableListDestroy(pBase->pTableInfoList); + tableListDestroy(pBase->pTableListInfo); taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache); cleanupExprSupp(&pBase->pseudoSup); } @@ -945,7 +945,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pInfo->base.pTableInfoList = pTableListInfo; + pInfo->base.pTableListInfo = pTableListInfo; pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { code = terrno; @@ -1065,7 +1065,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU if (hasNext) { /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); - pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } tsdbReaderClose(pReader); @@ -1087,7 +1087,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - return getTableGroupId(pTableScanInfo->base.pTableInfoList, uid); + return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid); } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1562,7 +1562,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.version = pBlock->info.version; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid); + pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -2326,6 +2326,7 @@ static void destroyStreamScanOperatorInfo(void* param) { if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { destroyOperatorInfo(pStreamScan->pTableScanOp); } + if (pStreamScan->tqReader) { tqCloseReader(pStreamScan->tqReader); } @@ -2359,6 +2360,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } @@ -2372,6 +2374,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { + tableListDestroy(pTableListInfo); goto _error; } @@ -2391,11 +2394,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); if (pSubTableExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } + pInfo->tbnameCalSup.pExprInfo = pSubTableExpr; createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0); if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) { + tableListDestroy(pTableListInfo); goto _error; } } @@ -2405,10 +2411,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags); if (pTagExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } } @@ -2416,6 +2424,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } @@ -2459,16 +2468,18 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); - SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableInfoList); + SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo); code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; } + taosArrayDestroy(tableIdList); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond)); } else { taosArrayDestroy(pColIds); + tableListDestroy(pTableListInfo); pColIds = NULL; } @@ -2503,7 +2514,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; + __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); @@ -2534,7 +2545,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); - int32_t size = tableListGetSize(pInfo->pTableInfoList); + int32_t size = tableListGetSize(pInfo->pTableListInfo); if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; @@ -2546,7 +2557,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = tableListGetInfo(pInfo->pTableInfoList, pInfo->curPos); + STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); int32_t code = metaGetTableEntryByUid(&mr, item->uid); tDecoderClear(&mr.coder); if (code != TSDB_CODE_SUCCESS) { @@ -2607,7 +2618,7 @@ static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->matchInfo.pList); - pInfo->pTableInfoList = tableListDestroy(pInfo->pTableInfoList); + pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo); taosMemoryFreeClear(param); } @@ -2634,7 +2645,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } - pInfo->pTableInfoList = pTableListInfo; + pInfo->pTableListInfo = pTableListInfo; pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -2668,7 +2679,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pInfo->base.pTableInfoList, readIdx + pInfo->tableStartIndex); + void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; if (NULL == source->dataReader || !source->multiReader) { @@ -2725,7 +2736,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -2781,10 +2792,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t numOfTables = tableListGetSize(pInfo->base.pTableInfoList); + size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { - STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableInfoList, i); + STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -2918,7 +2929,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = tableListGetSize(pInfo->base.pTableInfoList); + size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -2927,7 +2938,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -2952,7 +2963,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex)->groupId; + pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); resetLimitInfoForNextGroup(&pInfo->limitInfo); } @@ -3050,7 +3061,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->base.limitInfo.limit.limit = -1; pInfo->base.limitInfo.slimit.limit = -1; - pInfo->base.pTableInfoList = pTableListInfo; + pInfo->base.pTableListInfo = pTableListInfo; pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 70febcaf6a..4a52683c3a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2175,13 +2175,6 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S } } -bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { - SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, - GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); - return p1 == NULL; -} - bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId};