From 52457838256af3b3fa4d9f3fabb4a0a83eaefd7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jan 2022 10:51:23 +0800 Subject: [PATCH] [td-11818] add data into sink node --- include/libs/executor/executor.h | 12 ++-- source/libs/executor/inc/dataSinkMgt.h | 7 +- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/inc/executorimpl.h | 7 +- source/libs/executor/src/dataSinkMgt.c | 1 + source/libs/executor/src/executorMain.c | 88 +++++++++++++++---------- source/libs/executor/src/executorimpl.c | 1 + source/libs/qworker/src/qworker.c | 8 +-- src/inc/tsdb.h | 16 ++--- src/query/inc/qExecutor.h | 4 +- src/query/src/qExecutor.c | 66 +++++++++---------- 11 files changed, 120 insertions(+), 91 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 2356498bbe..71b014d025 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -21,6 +21,9 @@ extern "C" { #endif typedef void* qTaskInfo_t; +typedef void* DataSinkHandle; +struct SSubplan; + /** * Create the exec task object according to task json @@ -34,13 +37,14 @@ typedef void* qTaskInfo_t; int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo); /** - * the main task execution function, including query on both table and multiple tables, + * The main task execution function, including query on both table and multiple tables, * which are decided according to the tag or table name query conditions * - * @param qinfo + * @param tinfo + * @param handle * @return */ -bool qExecTask(qTaskInfo_t qTask, SSDataBlock** pRes); +int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle); /** * Retrieve the produced results information, if current query is not paused or completed, @@ -62,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo * @param contLen payload length * @return */ -int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); +//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); /** * return the transporter context (RPC) diff --git a/source/libs/executor/inc/dataSinkMgt.h b/source/libs/executor/inc/dataSinkMgt.h index d0057a213a..35a25f5104 100644 --- a/source/libs/executor/inc/dataSinkMgt.h +++ b/source/libs/executor/inc/dataSinkMgt.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "os.h" +#include "executor.h" #include "executorimpl.h" #define DS_CAPACITY_ENOUGH 1 @@ -39,8 +40,6 @@ typedef struct SDataSinkMgtCfg { int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); -typedef void* DataSinkHandle; - typedef struct SInputData { const SSDataBlock* pData; SHashObj* pTableRetrieveTsMap; @@ -68,6 +67,10 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH */ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); +/** + * + * @param handle + */ void dsEndPut(DataSinkHandle handle); /** diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index ef66b3f247..59db66becc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -20,6 +20,7 @@ extern "C" { #endif + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 33fafb4074..d76f270392 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -20,14 +20,16 @@ #include "ttszip.h" #include "tvariant.h" -#include "thash.h" +#include "dataSinkMgt.h" #include "executil.h" +#include "planner.h" #include "taosdef.h" #include "tarray.h" #include "tfilter.h" +#include "thash.h" #include "tlockfree.h" #include "tpagedfile.h" -#include "planner.h" +#include "executor.h" struct SColumnFilterElem; @@ -256,6 +258,7 @@ typedef struct SExecTaskInfo { // void* rspContext; // response context char *sql; // query sql string jmp_buf env; // + DataSinkHandle dsHandle; struct SOperatorInfo *pRoot; } SExecTaskInfo; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 8a96c5d05f..d9b122fbdd 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tarray.h" #include "dataSinkMgt.h" #include "dataSinkInt.h" #include "planner.h" diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ab9e7a5211..0c82bd1dac 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -13,9 +13,10 @@ * along with this program. If not, see . */ -#include -#include "exception.h" #include "os.h" +#include "tarray.h" +#include "dataSinkMgt.h" +#include "exception.h" #include "tcache.h" #include "tglobal.h" #include "tmsg.h" @@ -69,8 +70,9 @@ void freeParam(STaskParam *param) { int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) { assert(tsdb != NULL && pSubplan != NULL); + SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = doCreateExecTaskInfo(pSubplan, (SExecTaskInfo**) pTaskInfo, tsdb); + int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -81,8 +83,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ goto _error; } - DataSinkHandle pHandle = NULL; - code = dsCreateDataSinker(pSubplan->pDataSink, &pHandle); + code = dsCreateDataSinker(pSubplan->pDataSink, (*pTask)->dsHandle); _error: // if failed to add ref for all tables in this query, abort current query @@ -134,64 +135,79 @@ int waitMoment(SQInfo* pQInfo){ } #endif -bool qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes) { - SExecTaskInfo *pTaskInfo = (SExecTaskInfo *) tinfo; - int64_t threadId = taosGetSelfPthreadId(); +int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + int64_t threadId = taosGetSelfPthreadId(); int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*) curOwner); + qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, + (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; - return false; + return pTaskInfo->code; } - if(pTaskInfo->cost.start == 0) { + if (pTaskInfo->cost.start == 0) { pTaskInfo->cost.start = taosGetTimestampMs(); } if (isTaskKilled(pTaskInfo)) { - qDebug("QInfo:0x%"PRIx64" it is already killed, abort", GET_TASKID(pTaskInfo)); -// return doBuildResCheck(pTaskInfo); + qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); + return pTaskInfo->code; } -// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; -// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) { -// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo)); -// setTaskStatus(pTaskInfo, TASK_COMPLETED); -// return doBuildResCheck(pTaskInfo); -// } + // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; + // if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) { + // qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo)); + // setTaskStatus(pTaskInfo, TASK_COMPLETED); + // return doBuildResCheck(pTaskInfo); + // } // error occurs, record the error code and return to client int32_t ret = setjmp(pTaskInfo->env); if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); -// return doBuildResCheck(pTaskInfo); + qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); + return pTaskInfo->code; } - qDebug("QInfo:0x%"PRIx64" query task is launched", GET_TASKID(pTaskInfo)); + qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); + int64_t st = 0; - int64_t st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); - // todo put the result into sink node. + handle = &pTaskInfo->dsHandle; - pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); - publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); + while(1) { + st = taosGetTimestampUs(); + SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); - if (isTaskKilled(pTaskInfo)) { - qDebug("QInfo:0x%"PRIx64" query is killed", GET_TASKID(pTaskInfo)); -// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { -// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, -// pRuntimeEnv->resultInfo.total); - } else { -// qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pTaskInfo->qId, -// GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total); + pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); + publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pRes == NULL) { // no results generated yet, abort + return pTaskInfo->code; + } + + int32_t status = 0; + SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; + pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &status); + + if (isTaskKilled(pTaskInfo)) { + qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo)); + // } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { + // qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + // pRuntimeEnv->resultInfo.total); + } + + if (status == DS_CAPACITY_FULL) { + qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo), + 0, 0L, 0); + return pTaskInfo->code; + } } -// return doBuildResCheck(pTaskInfo); } int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index db25e384be..9aeb979806 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7181,6 +7181,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { pthread_mutex_init(&pTaskInfo->lock, NULL); pTaskInfo->cost.created = taosGetTimestampMs(); + pTaskInfo->id.queryId = queryId; return pTaskInfo; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7351af83c5..ada534de7e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1038,9 +1038,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); queryRsped = true; - - SSDataBlock* pRes = NULL; - code = qExecTask(pTaskInfo, &pRes); + + DataSinkHandle handle = NULL; + code = qExecTask(pTaskInfo, &handle); queryDone = false; //TODO call executer to execute subquery @@ -1048,7 +1048,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (code) { QW_ERR_JRET(code); } else { - QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, pRes)); +// QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, pRes)); QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); } diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 65c8a45d00..130628e799 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -274,7 +274,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef); -bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle); +bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle); /** @@ -308,19 +308,19 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle); /** * move to next block if exists * - * @param pQueryHandle + * @param pTsdbReadHandle * @return */ -bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle); +bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle); /** * Get current data block information * - * @param pQueryHandle + * @param pTsdbReadHandle * @param pBlockInfo * @return */ -void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo); +void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); /** * @@ -332,7 +332,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *p * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 * @return */ -int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis); +int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis); /** * @@ -340,11 +340,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta * the returned data block must be satisfied with the time window condition in any cases, * which means the SData data block is not actually the completed disk data blocks. * - * @param pQueryHandle query handle + * @param pTsdbReadHandle query handle * @param pColumnIdList required data columns id list * @return */ -SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList); +SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList); /** * Get the qualified table id for a super table according to the tag query expression. diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 19ca8e7ed8..0c0e3363c8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -284,7 +284,7 @@ typedef struct SQueryRuntimeEnv { uint32_t status; // query status void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not - void* pQueryHandle; + void* pTsdbReadHandle; int32_t prevGroupId; // previous executed group id bool enableGroupData; @@ -418,7 +418,7 @@ typedef struct SQueryParam { } SQueryParam; typedef struct STableScanInfo { - void *pQueryHandle; + void *pTsdbReadHandle; int32_t numOfBlocks; int32_t numOfSkipped; int32_t numOfBlockStatis; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index be1bfb8143..a6a9115b2f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2326,8 +2326,8 @@ _clean: static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; + tsdbCleanupQueryHandle(pRuntimeEnv->pTsdbReadHandle); + pRuntimeEnv->pTsdbReadHandle = NULL; SMemRef* pMemRef = &pQueryAttr->memRef; assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL); @@ -3148,10 +3148,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa } else if ((*status) == BLK_DATA_STATIS_NEEDED) { // this function never returns error? pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockStatis); if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); pCost->totalCheckedRows += pBlock->info.rows; } } else { @@ -3159,7 +3159,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // load the data block statistics to perform further filter pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockStatis); if (pQueryAttr->topBotQuery && pBlock->pBlockStatis != NULL) { { // set previous window @@ -3205,7 +3205,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pCost->totalCheckedRows += pBlockInfo->rows; pCost->loadBlocks += 1; - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); if (pBlock->pDataBlock == NULL) { return terrno; } @@ -4494,7 +4494,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1); // -// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); +// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // // // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value @@ -4521,15 +4521,15 @@ void queryCostStatis(SQInfo *pQInfo) { // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); // // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; -// TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; +// TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle; // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pQueryHandle)) { +// while (tsdbNextDataBlock(pTsdbReadHandle)) { // if (isQueryKilled(pRuntimeEnv->qinfo)) { // longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); // } // -// tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); +// tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo); // // if (pQueryAttr->limit.offset > blockInfo.rows) { // pQueryAttr->limit.offset -= blockInfo.rows; @@ -4562,7 +4562,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // // load the data block and check data remaining in current data block // // TODO optimize performance -// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); +// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // // tw = *win; @@ -4627,8 +4627,8 @@ void queryCostStatis(SQInfo *pQInfo) { // STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { -// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo); +// while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) { +// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo); // // if (QUERY_IS_ASC_QUERY(pQueryAttr)) { // if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { @@ -4674,7 +4674,7 @@ void queryCostStatis(SQInfo *pQInfo) { // */ // if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) { // -// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); +// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // // if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) { @@ -4748,7 +4748,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQueryAttr)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); // update the query time window pQueryAttr->window = cond.twindow; @@ -4769,11 +4769,11 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 } } } else if (isCachedLastQuery(pQueryAttr)) { - pRuntimeEnv->pQueryHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else if (pQueryAttr->pointInterpQuery) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } return terrno; @@ -4831,19 +4831,19 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr switch(tbScanner) { case OP_TableBlockInfoScan: { - pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); break; } case OP_TableSeqScan: { - pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); break; } case OP_DataBlocksOptScan: { - pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); + pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); break; } case OP_TableScan: { - pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); + pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); break; } default: { // do nothing @@ -4974,13 +4974,13 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { *newgroup = false; - while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + while (tsdbNextDataBlock(pTableScanInfo->pTsdbReadHandle)) { if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } pTableScanInfo->numOfBlocks += 1; - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + tsdbRetrieveDataBlockInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->info); // todo opt if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { @@ -5037,7 +5037,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { } if (++pTableScanInfo->current >= pTableScanInfo->times) { - if (pTableScanInfo->reverseTimes <= 0 || isTsdbCacheLastRow(pTableScanInfo->pQueryHandle)) { + if (pTableScanInfo->reverseTimes <= 0 || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)) { return NULL; } else { break; @@ -5046,7 +5046,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { // do prepare for the next round table scan operation STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); + tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; @@ -5069,7 +5069,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); + tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); @@ -5112,8 +5112,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { tableBlockDist.maxRows = INT_MIN; tableBlockDist.minRows = INT_MAX; - tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist); - tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle); + tsdbGetFileBlocksDistInfo(pTableScanInfo->pTsdbReadHandle, &tableBlockDist); + tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pTsdbReadHandle); SSDataBlock* pBlock = &pTableScanInfo->block; pBlock->info.rows = 1; @@ -5142,7 +5142,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; @@ -5165,7 +5165,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->times = 1; pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; @@ -5189,7 +5189,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); SColumnInfoData infoData = {{0}}; @@ -5271,7 +5271,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; pInfo->current = 0;