From 0354ac2f331f0a3162038f2e045cda799cbca21e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 18 Mar 2022 11:12:49 +0800 Subject: [PATCH] [td-13039] refactor. --- source/libs/executor/inc/executorimpl.h | 48 ++-- source/libs/executor/src/executorMain.c | 2 +- source/libs/executor/src/executorimpl.c | 337 ++++++++++-------------- 3 files changed, 166 insertions(+), 221 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8bcb97c6ae..bc7d7353af 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -253,11 +253,6 @@ typedef struct STaskIdInfo { char* str; } STaskIdInfo; -typedef struct STaskBufInfo { - int32_t bufSize; // total available buffer size in bytes - int32_t remainBuf; // remain buffer size -} STaskBufInfo; - typedef struct SExecTaskInfo { STaskIdInfo id; char* content; @@ -269,8 +264,7 @@ typedef struct SExecTaskInfo { uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string - jmp_buf env; // when error occurs, abort - STaskBufInfo bufInfo; // available buffer info this task + jmp_buf env; // struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -313,11 +307,9 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_NOT_OPENED = 0x0, - OP_OPENED = 0x1, - OP_IN_EXECUTING = 0x3, - OP_RES_TO_RETURN = 0x5, - OP_EXEC_DONE = 0x9, + OP_IN_EXECUTING = 1, + OP_RES_TO_RETURN = 2, + OP_EXEC_DONE = 3, }; typedef struct SOperatorInfo { @@ -330,14 +322,12 @@ typedef struct SOperatorInfo { SExprInfo* pExpr; STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; - SOperatorCostInfo cost; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_fn_t getNextFn; - __optr_fn_t cleanupFn; + __optr_open_fn_t openFn; + __optr_fn_t nextDataFn; __optr_close_fn_t closeFn; - __optr_open_fn_t _openFn; // DO NOT invoke this function directly } SOperatorInfo; typedef struct { @@ -366,9 +356,9 @@ typedef struct SQInfo { } SQInfo; enum { - EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, - EX_SOURCE_DATA_EXHAUSTED = 0x3, + DATA_NOT_READY = 0x1, + DATA_READY = 0x2, + DATA_EXHAUSTED = 0x3, }; typedef struct SSourceDataInfo { @@ -385,6 +375,12 @@ typedef struct SLoadRemoteDataInfo { uint64_t totalElapsed; // total elapsed time } SLoadRemoteDataInfo; +enum { + EX_SOURCE_DATA_NOT_READY = 0x1, + EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_EXHAUSTED = 0x3, +}; + typedef struct SExchangeInfo { SArray* pSources; SArray* pSourceDataInfo; @@ -439,13 +435,14 @@ typedef struct SSysTableScanInfo { }; SRetrieveMetaTableRsp *pRsp; - void *pCur; // cursor SRetrieveTableReq req; SEpSet epSet; - int32_t type; // show type - SName name; tsem_t ready; - SSchema* pSchema; + + SNode* pCondition; // db_name filter condition, to discard data that are not in current database + void *pCur; // cursor for iterate the local table meta store. + int32_t type; // show type, TODO remove it + SName name; SSDataBlock* pRes; int32_t capacity; int64_t numOfBlocks; // extract basic running information. @@ -630,14 +627,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SEpSet epset, +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); - SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, @@ -682,6 +677,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 7e55a4b3e1..fabaa2d31d 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t st = 0; st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0b0f5779d3..8ae8ac00d7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,8 +13,10 @@ * along with this program. If not, see . */ +#include #include #include +#include #include #include "os.h" @@ -211,9 +213,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); -static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); -static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput); - static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput); @@ -224,17 +223,6 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } -#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) -#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) - -static int32_t operatorDummyOpenFn(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - OPTR_SET_OPENED(pOperator); - return TSDB_CODE_SUCCESS; -} - -static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} - static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); @@ -4737,11 +4725,6 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; - } - // The read handle is not initialized yet, since no qualified tables exists if (pTableScanInfo->pTsdbReadHandle == NULL) { return NULL; @@ -4859,11 +4842,6 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; - pTaskInfo->code = pOperator->_openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; - } - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; pBlockInfo->rows = 0; @@ -4977,20 +4955,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total) { -// char* pData = pRsp->data; + blockDataEnsureCapacity(pRes, numOfRows); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows); - if (tmp == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; + for(int32_t j = 0; j < numOfRows; ++j) { + colDataAppend(pColInfoData, j, pData, false); + pData += pColInfoData->info.bytes; } - - size_t len = numOfRows * pColInfoData->info.bytes; - memcpy(tmp, pData, len); - pColInfoData->pData = tmp; - pData += len; } pRes->info.rows = numOfRows; @@ -5034,12 +5007,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { + if (pDataInfo->status == DATA_EXHAUSTED) { completed += 1; continue; } - if (pDataInfo->status != EX_SOURCE_DATA_READY) { + if (pDataInfo->status != DATA_READY) { continue; } @@ -5052,7 +5025,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + pDataInfo->status = DATA_EXHAUSTED; completed += 1; continue; } @@ -5070,15 +5043,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + pDataInfo->status = DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } - if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { - pDataInfo->status = EX_SOURCE_DATA_NOT_READY; + if (pDataInfo->status != DATA_EXHAUSTED) { + pDataInfo->status = DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5098,9 +5071,13 @@ _error: return NULL; } -static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) { +static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo *pOperator) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + + if (pOperator->status == OP_RES_TO_RETURN) { + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + } size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); int64_t startTs = taosGetTimestampUs(); @@ -5109,8 +5086,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) { for(int32_t i = 0; i < totalSources; ++i) { int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return code; + return NULL; } } @@ -5118,9 +5094,9 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo *pOperator) { qDebug("%s send all fetch request to %"PRIzu" sources completed, elapsed:%"PRId64, GET_TASKID(pTaskInfo), totalSources, endTs - startTs); tsem_wait(&pExchangeInfo->ready); - pOperator->cost.openCost = taosGetTimestampUs() - startTs; - return TSDB_CODE_SUCCESS; + pOperator->status = OP_RES_TO_RETURN; + return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { @@ -5150,7 +5126,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + pDataInfo->status = DATA_EXHAUSTED; pExchangeInfo->current += 1; continue; } @@ -5167,7 +5143,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + pDataInfo->status = DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, @@ -5178,26 +5154,6 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { } } -static int32_t prepareLoadRemoteData(void* param) { - SOperatorInfo *pOperator = (SOperatorInfo*) param; - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - - SExchangeInfo *pExchangeInfo = pOperator->info; - if (pExchangeInfo->seqLoadData) { - // do nothing for sequentially load data - } else { - int32_t code = prepareConcurrentlyLoad(pOperator); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - OPTR_SET_OPENED(pOperator); - return TSDB_CODE_SUCCESS; -} - static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; @@ -5206,6 +5162,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); @@ -5213,10 +5170,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { } *newgroup = false; + if (pExchangeInfo->seqLoadData) { return seqLoadRemoteData(pOperator); } else { - return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); + return concurrentlyLoadRemoteData(pOperator); } #if 0 @@ -5229,35 +5187,16 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { #endif } -static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { - pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); - if (pInfo->pSourceDataInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - for(int32_t i = 0; i < numOfSources; ++i) { - SSourceDataInfo dataInfo = {0}; - dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.pEx = pInfo; - dataInfo.index = i; - - void* ret = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); - if (ret == NULL) { - taosArrayDestroy(pInfo->pSourceDataInfo); - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - return TSDB_CODE_SUCCESS; -} - // TODO handle the error SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - goto _error; + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } size_t numOfSources = LIST_LENGTH(pSources); @@ -5284,28 +5223,29 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock return NULL; } - int32_t code = initDataSource(numOfSources, pInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; + for(int32_t i = 0; i < numOfSources; ++i) { + SSourceDataInfo dataInfo = {0}; + dataInfo.status = DATA_NOT_READY; + dataInfo.pEx = pInfo; + dataInfo.index = i; + + taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); } size_t size = pBlock->info.numOfCols; pInfo->pResult = pBlock; pInfo->seqLoadData = true; - pInfo->seqLoadData = true; // sequentially load data from the source node tsem_init(&pInfo->ready, 0, 0); pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = size; + pOperator->nextDataFn = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function. - pOperator->getNextFn = doLoadRemoteData; - pOperator->closeFn = destroyExchangeOperatorInfo; #if 1 { // todo refactor @@ -5331,16 +5271,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock #endif return pOperator; - - _error: - if (pInfo != NULL) { - destroyExchangeOperatorInfo(pInfo, 0); - } - - tfree(pInfo); - tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; } SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { @@ -5391,12 +5321,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doTableScan; - pOperator->closeFn = operatorDummyCloseFn; + pOperator->nextDataFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -5417,11 +5345,11 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doTableScanImpl; + pOperator->nextDataFn = doTableScanImpl; return pOperator; } @@ -5442,10 +5370,10 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt pOperator->name = "TableBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; - pOperator->getNextFn = doBlockInfoScan; + pOperator->nextDataFn = doBlockInfoScan; return pOperator; } @@ -5475,13 +5403,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doStreamBlockScan; - pOperator->closeFn = operatorDummyCloseFn; - + pOperator->nextDataFn = doStreamBlockScan; pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -5571,6 +5496,41 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); + // do filter the qualified results + { + SFilterInfo *filter = NULL; + code = filterInitFromNode(pInfo->pCondition, &filter, 0); + + SFilterColumnParam param1 = {.numOfCols= pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; + code = filterSetDataFromSlotId(filter, ¶m1); + + int8_t *rowRes = NULL; + bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + printf("%d, %d\n", rowRes[0], rowRes[1]); + + SSDataBlock* px = createOneDataBlock(pInfo->pRes); + blockDataEnsureCapacity(px, pInfo->pRes->info.rows); + int32_t numOfRow = 0; + + for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { + SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); + + numOfRow = 0; + for(int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } + + colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); + numOfRow += 1; + } + } + + px->info.rows = numOfRow; + pInfo->pRes = px; + } + return pInfo->pRes; } @@ -5578,7 +5538,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { } SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SEpSet epset, SExecTaskInfo* pTaskInfo) { + SNode* pCondition, SEpSet epset, SExecTaskInfo* pTaskInfo) { SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5588,9 +5548,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB return NULL; } - pInfo->pRes = pResBlock; - pInfo->capacity = 4096; + pInfo->pRes = pResBlock; + pInfo->capacity = 4096; + pInfo->pCondition = pCondition; + // TODO remove it int32_t tableType = 0; const char* name = tNameGetTableName(pName); if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) { @@ -5640,7 +5602,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->getNextFn = doSysTableScan; + pOperator->nextDataFn = doSysTableScan; pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; @@ -5833,7 +5795,7 @@ SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; bool newgroup = false; - return pOperator->getNextFn(pOperator, &newgroup); + return pOperator->nextDataFn(pOperator, &newgroup); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { @@ -6147,13 +6109,13 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->name = "SortedMerge"; // pOperator->operatorType = OP_SortedMerge; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSortedMerge; + pOperator->nextDataFn = doSortedMerge; pOperator->closeFn = destroySortedMergeOperatorInfo; code = appendDownstream(pOperator, downstream, numOfDownstream); @@ -6245,11 +6207,11 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI pOperator->name = "Order"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; + pOperator->nextDataFn = doSort; pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -6275,7 +6237,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6325,7 +6287,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6411,7 +6373,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6469,7 +6431,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6520,7 +6482,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { while (1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock *pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); + SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6563,7 +6525,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6623,7 +6585,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6686,7 +6648,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6741,7 +6703,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6876,7 +6838,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -6938,7 +6900,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -6991,7 +6953,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -7076,7 +7038,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); + SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -7233,13 +7195,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doAggregate; + pOperator->nextDataFn = doAggregate; pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7288,17 +7250,17 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevData); } -void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; pInfo->pRes = blockDataDestroy(pInfo->pRes); } -void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); @@ -7328,17 +7290,6 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) } } -void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { - SExchangeInfo* pExInfo = (SExchangeInfo*) param; - taosArrayDestroy(pExInfo->pSources); - taosArrayDestroy(pExInfo->pSourceDataInfo); - if (pExInfo->pResult != NULL) { - blockDataDestroy(pExInfo->pResult); - } - - tsem_destroy(&pExInfo->ready); -} - SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -7353,13 +7304,13 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->name = "MultiTableAggregate"; // pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doMultiTableAggregate; + pOperator->nextDataFn = doMultiTableAggregate; pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7379,12 +7330,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo); - pOperator->getNextFn = doProjectOperation; + pOperator->nextDataFn = doProjectOperation; pOperator->pTaskInfo = pTaskInfo; pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7434,11 +7385,11 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->name = "LimitOperator"; // pOperator->operatorType = OP_Limit; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->getNextFn = doLimit; + pOperator->status = OP_IN_EXECUTING; + pOperator->nextDataFn = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7466,13 +7417,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx pOperator->name = "TimeIntervalAggOperator"; // pOperator->operatorType = OP_TimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->info = pInfo; - pOperator->getNextFn = doIntervalAgg; + pOperator->nextDataFn = doIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -7491,12 +7442,12 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->name = "AllTimeIntervalAggOperator"; // pOperator->operatorType = OP_AllTimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doAllIntervalAgg; + pOperator->nextDataFn = doAllIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7515,12 +7466,12 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->name = "StateWindowOperator"; // pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doStateWindowAgg; + pOperator->nextDataFn = doStateWindowAgg; pOperator->closeFn = destroyStateWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7540,12 +7491,12 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->name = "SessionWindowAggOperator"; // pOperator->operatorType = OP_SessionWindow; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doSessionWindowAgg; + pOperator->nextDataFn = doSessionWindowAgg; pOperator->closeFn = destroySWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7563,13 +7514,13 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->name = "MultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doSTableIntervalAgg; + pOperator->nextDataFn = doSTableIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7587,13 +7538,13 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->name = "AllMultiTableTimeIntervalOperator"; // pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doAllSTableIntervalAgg; + pOperator->nextDataFn = doAllSTableIntervalAgg; pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7619,13 +7570,13 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = hashGroupbyAggregate; + pOperator->nextDataFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupbyOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7658,13 +7609,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->name = "FillOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; // pOperator->operatorType = OP_Fill; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doFill; + pOperator->nextDataFn = doFill; pOperator->closeFn = destroySFillOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -7709,12 +7660,13 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "SLimitOperator"; // pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; // pOperator->exec = doSLimit; pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->closeFn = destroySlimitOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7848,7 +7800,6 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { return (pRes->info.rows == 0)? NULL:pInfo->pRes; #endif - return 0; } SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { @@ -7865,9 +7816,9 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo pOperator->name = "SeqTableTagScan"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->getNextFn = doTagScan; + pOperator->nextDataFn = doTagScan; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; @@ -7937,7 +7888,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); + pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -8003,13 +7954,13 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; + pOperator->status = OP_IN_EXECUTING; // pOperator->operatorType = OP_Distinct; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = hashDistinct; + pOperator->nextDataFn = hashDistinct; pOperator->pExpr = pExpr; pOperator->closeFn = destroyDistinctOperatorInfo; @@ -8174,9 +8125,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); - if (NULL == pDataReader) { - return NULL; - } + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); @@ -8202,7 +8151,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); - SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, &pSysScanPhyNode->scan.tableName, pSysScanPhyNode->mgmtEpSet, pTaskInfo); + SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, &pSysScanPhyNode->scan.tableName, pSysScanPhyNode->scan.node.pConditions, pSysScanPhyNode->mgmtEpSet, pTaskInfo); return pOperator; } else { ASSERT(0);