diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fdf64b7af2..2aaa2168cc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -986,6 +986,14 @@ typedef struct { char msg[]; } SSubQueryMsg; +typedef struct { + SMsgHead header; + uint64_t sId; + uint64_t queryId; + uint64_t taskId; +} SSinkDataReq; + + typedef struct { SMsgHead header; uint64_t sId; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 2356498bbe..71b014d025 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -21,6 +21,9 @@ extern "C" { #endif typedef void* qTaskInfo_t; +typedef void* DataSinkHandle; +struct SSubplan; + /** * Create the exec task object according to task json @@ -34,13 +37,14 @@ typedef void* qTaskInfo_t; int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo); /** - * the main task execution function, including query on both table and multiple tables, + * The main task execution function, including query on both table and multiple tables, * which are decided according to the tag or table name query conditions * - * @param qinfo + * @param tinfo + * @param handle * @return */ -bool qExecTask(qTaskInfo_t qTask, SSDataBlock** pRes); +int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle); /** * Retrieve the produced results information, if current query is not paused or completed, @@ -62,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo * @param contLen payload length * @return */ -int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); +//int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); /** * return the transporter context (RPC) diff --git a/source/libs/executor/inc/dataSinkMgt.h b/source/libs/executor/inc/dataSinkMgt.h index d0057a213a..6c0cda3d0a 100644 --- a/source/libs/executor/inc/dataSinkMgt.h +++ b/source/libs/executor/inc/dataSinkMgt.h @@ -21,12 +21,13 @@ extern "C" { #endif #include "os.h" +#include "executor.h" #include "executorimpl.h" #define DS_CAPACITY_ENOUGH 1 -#define DS_CAPACITY_FULL 2 +#define DS_DATA_FULL 2 #define DS_NEED_SCHEDULE 3 -#define DS_END 4 +#define DS_QUERY_END 4 #define DS_IN_PROCESS 5 struct SDataSink; @@ -39,8 +40,6 @@ typedef struct SDataSinkMgtCfg { int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); -typedef void* DataSinkHandle; - typedef struct SInputData { const SSDataBlock* pData; SHashObj* pTableRetrieveTsMap; @@ -68,6 +67,10 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH */ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); +/** + * + * @param handle + */ void dsEndPut(DataSinkHandle handle); /** diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index ef66b3f247..59db66becc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -20,6 +20,7 @@ extern "C" { #endif + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 33fafb4074..d76f270392 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -20,14 +20,16 @@ #include "ttszip.h" #include "tvariant.h" -#include "thash.h" +#include "dataSinkMgt.h" #include "executil.h" +#include "planner.h" #include "taosdef.h" #include "tarray.h" #include "tfilter.h" +#include "thash.h" #include "tlockfree.h" #include "tpagedfile.h" -#include "planner.h" +#include "executor.h" struct SColumnFilterElem; @@ -256,6 +258,7 @@ typedef struct SExecTaskInfo { // void* rspContext; // response context char *sql; // query sql string jmp_buf env; // + DataSinkHandle dsHandle; struct SOperatorInfo *pRoot; } SExecTaskInfo; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3d8e51d04d..5fa35edb06 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -124,7 +124,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); - int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; + int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_DATA_FULL; pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; @@ -152,14 +152,14 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, static void endPut(struct SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); - pDispatcher->status = DS_END; + pDispatcher->status = DS_QUERY_END; pthread_mutex_unlock(&pDispatcher->mutex); } static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { - *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; + *pStatus = getStatus(pDispatcher) ? DS_QUERY_END : DS_IN_PROCESS; return 0; } SDataDispatchBuf* pBuf = NULL; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 8a96c5d05f..d9b122fbdd 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tarray.h" #include "dataSinkMgt.h" #include "dataSinkInt.h" #include "planner.h" diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ab9e7a5211..e8799542b2 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -13,9 +13,10 @@ * along with this program. If not, see . */ -#include -#include "exception.h" #include "os.h" +#include "tarray.h" +#include "dataSinkMgt.h" +#include "exception.h" #include "tcache.h" #include "tglobal.h" #include "tmsg.h" @@ -69,8 +70,9 @@ void freeParam(STaskParam *param) { int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) { assert(tsdb != NULL && pSubplan != NULL); + SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = doCreateExecTaskInfo(pSubplan, (SExecTaskInfo**) pTaskInfo, tsdb); + int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -81,8 +83,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ goto _error; } - DataSinkHandle pHandle = NULL; - code = dsCreateDataSinker(pSubplan->pDataSink, &pHandle); + code = dsCreateDataSinker(pSubplan->pDataSink, (*pTask)->dsHandle); _error: // if failed to add ref for all tables in this query, abort current query @@ -134,64 +135,79 @@ int waitMoment(SQInfo* pQInfo){ } #endif -bool qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes) { - SExecTaskInfo *pTaskInfo = (SExecTaskInfo *) tinfo; - int64_t threadId = taosGetSelfPthreadId(); +int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + int64_t threadId = taosGetSelfPthreadId(); int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*) curOwner); + qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, + (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; - return false; + return pTaskInfo->code; } - if(pTaskInfo->cost.start == 0) { + if (pTaskInfo->cost.start == 0) { pTaskInfo->cost.start = taosGetTimestampMs(); } if (isTaskKilled(pTaskInfo)) { - qDebug("QInfo:0x%"PRIx64" it is already killed, abort", GET_TASKID(pTaskInfo)); -// return doBuildResCheck(pTaskInfo); + qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); + return pTaskInfo->code; } -// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; -// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) { -// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo)); -// setTaskStatus(pTaskInfo, TASK_COMPLETED); -// return doBuildResCheck(pTaskInfo); -// } + // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; + // if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) { + // qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo)); + // setTaskStatus(pTaskInfo, TASK_COMPLETED); + // return doBuildResCheck(pTaskInfo); + // } // error occurs, record the error code and return to client int32_t ret = setjmp(pTaskInfo->env); if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); -// return doBuildResCheck(pTaskInfo); + qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); + return pTaskInfo->code; } - qDebug("QInfo:0x%"PRIx64" query task is launched", GET_TASKID(pTaskInfo)); + qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); + int64_t st = 0; - int64_t st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); - // todo put the result into sink node. + handle = &pTaskInfo->dsHandle; - pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); - publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); + while(1) { + st = taosGetTimestampUs(); + SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); - if (isTaskKilled(pTaskInfo)) { - qDebug("QInfo:0x%"PRIx64" query is killed", GET_TASKID(pTaskInfo)); -// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { -// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, -// pRuntimeEnv->resultInfo.total); - } else { -// qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pTaskInfo->qId, -// GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total); + pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); + publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pRes == NULL) { // no results generated yet, abort + return pTaskInfo->code; + } + + int32_t status = 0; + SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; + pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &status); + + if (isTaskKilled(pTaskInfo)) { + qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo)); + // } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { + // qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, + // pRuntimeEnv->resultInfo.total); + } + + if (status == DS_DATA_FULL) { + qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo), + 0, 0L, 0); + return pTaskInfo->code; + } } -// return doBuildResCheck(pTaskInfo); } int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index db25e384be..9aeb979806 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7181,6 +7181,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { pthread_mutex_init(&pTaskInfo->lock, NULL); pTaskInfo->cost.created = taosGetTimestampMs(); + pTaskInfo->id.queryId = queryId; return pTaskInfo; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 691177fa83..7883079fbe 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -67,11 +67,12 @@ typedef struct SQWTaskStatus { bool drop; } SQWTaskStatus; -typedef struct SQWorkerTaskHandleCache { +typedef struct SQWorkerTaskHandlesCache { SRWLatch lock; + bool needRsp; qTaskInfo_t taskHandle; DataSinkHandle sinkHandle; -} SQWorkerTaskHandleCache; +} SQWorkerTaskHandlesCache; typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5f2f6214d8..91a27a831f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -91,16 +91,16 @@ int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) { return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskAndSinkToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) { +int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - SQWorkerResCache resCache = {0}; + SQWorkerTaskHandlesCache resCache = {0}; resCache.taskHandle = taskHandle; resCache.sinkHandle = sinkHandle; QW_LOCK(QW_WRITE, &mgmt->resLock); - if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) { + if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerTaskHandlesCache))) { QW_UNLOCK(QW_WRITE, &mgmt->resLock); qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); return TSDB_CODE_QRY_APP_ERROR; @@ -249,13 +249,13 @@ static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ QW_RET(code); } -static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) { +static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerTaskHandlesCache **handles) { char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); QW_LOCK(rwType, &mgmt->resLock); - *res = taosHashGet(mgmt->resHash, id, sizeof(id)); - if (NULL == (*res)) { + *handles = taosHashGet(mgmt->resHash, id, sizeof(id)); + if (NULL == (*handles)) { QW_UNLOCK(rwType, &mgmt->resLock); return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; } @@ -605,19 +605,34 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); +int32_t qwInitFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { + int32_t msgSize = sizeof(SRetrieveTableRsp) + length; + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize); + if (NULL == pRsp) { + qError("rpcMallocCont %d failed", msgSize); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); - //TODO fill msg - pRsp->completed = true; + return TSDB_CODE_SUCCESS; +} + + +int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { + if (NULL == pRsp) { + pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + dataLength = 0; + } SRpcMsg rpcRsp = { .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, - .contLen = sizeof(*pRsp), - .code = 0, + .contLen = sizeof(*pRsp) + dataLength, + .code = code, }; rpcSendResponse(&rpcRsp); @@ -850,12 +865,16 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI return TSDB_CODE_SUCCESS; } -int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { +int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; int32_t needRsp = true; void *data = NULL; + int32_t sinkStatus = 0; + int32_t dataLength = 0; + SRetrieveTableRsp *rsp = NULL; + bool queryEnd = false; QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -871,26 +890,61 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, u qError("invalid status %d for fetch", task->status); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } + + code = dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd); + if (TSDB_CODE_SUCCESS != code) { + qError("dsGetDataLength failed, code:%x", code); + QW_ERR_JRET(code); + } - if (QW_GOT_RES_DATA(res->data)) { - data = res->data; - if (QW_LOW_RES_DATA(res->data)) { - if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - //TODO add query back to queue - } - } - } else { - if (task->status != JOB_TASK_STATUS_EXECUTING) { - qError("invalid status %d for fetch without res", task->status); - QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + if (dataLength > 0) { + SOutPutData output = {0}; + QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp)); + + output.pData = rsp->data; + + code = dsGetDataBlock(handles->sinkHandle, &output); + if (code) { + qError("dsGetDataBlock failed, code:%x", code); + QW_ERR_JRET(code); } - //TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY + if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) { + rsp->completed = 1; + } - needRsp = false; + if (output.needSchedule) { + //TODO + } + + if ((!output.queryEnd) && DS_BUF_LOW == output.bufStatus) { + //TODO + //UPDATE STATUS TO EXECUTING + } + } else { + if (dataLength < 0) { + qError("invalid length from dsGetDataLength, length:%d", dataLength); + QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (queryEnd) { + QW_ERR_JRET(qwQueryPostProcess(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED, code)); + } else { + if (task->status != JOB_TASK_STATUS_EXECUTING) { + qError("invalid status %d for fetch without res", task->status); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + QW_LOCK(QW_WRITE, &handles->lock); + handles->needRsp = true; + QW_UNLOCK(QW_WRITE, &handles->lock); + + needRsp = false; + } } _return: + if (task) { QW_UNLOCK(QW_READ, &task->lock); qwReleaseTask(QW_READ, sch); @@ -901,7 +955,7 @@ _return: } if (needRsp) { - qwBuildAndSendFetchRsp(pMsg, res->data); + qwBuildAndSendFetchRsp(pMsg, rsp, dataLength, code); } QW_RET(code); @@ -1011,7 +1065,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { msg->taskId = htobe64(msg->taskId); msg->contentLen = ntohl(msg->contentLen); - bool queryDone = false; bool queryRsped = false; bool needStop = false; struct SSubplan *plan = NULL; @@ -1039,20 +1092,19 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); queryRsped = true; - - SSDataBlock* pRes = NULL; - code = qExecTask(pTaskInfo, &pRes); - queryDone = false; + DataSinkHandle sinkHandle = NULL; + code = qExecTask(pTaskInfo, &sinkHandle); if (code) { QW_ERR_JRET(code); } else { - QW_ERR_JRET(qwAddTaskAndSinkToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, NULL)); + QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); } _return: + if (queryRsped) { code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code); } else { @@ -1062,8 +1114,6 @@ _return: int8_t status = 0; if (TSDB_CODE_SUCCESS != code) { status = JOB_TASK_STATUS_FAILED; - } else if (queryDone) { - status = JOB_TASK_STATUS_SUCCEED; } else { status = JOB_TASK_STATUS_PARTIAL_SUCCEED; } @@ -1073,6 +1123,49 @@ _return: QW_RET(code); } +int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + int32_t code = 0; + int8_t status = 0; + bool queryDone = false; + uint64_t sId, qId, tId; + + //TODO call executer to continue execute subquery + code = 0; + void *data = NULL; + queryDone = false; + //TODO call executer to continue execute subquery + + if (TSDB_CODE_SUCCESS != code) { + status = JOB_TASK_STATUS_FAILED; + } else if (queryDone) { + status = JOB_TASK_STATUS_SUCCEED; + } else { + status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + } + + code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code); + + QW_RET(code); +} + + + +int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SSinkDataReq *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { + qError("invalid sink data msg"); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + //TODO + + return TSDB_CODE_SUCCESS; +} + int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; @@ -1135,12 +1228,12 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId)); void *data = NULL; - SQWorkerResCache *res = NULL; + SQWorkerTaskHandlesCache *handles = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res)); + QW_ERR_RET(qwAcquireTaskHandles(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &handles)); - QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); + QW_ERR_JRET(qwHandleFetch(handles, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); _return: @@ -1218,31 +1311,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); } -int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { - int32_t code = 0; - int8_t status = 0; - bool queryDone = false; - uint64_t sId, qId, tId; - - //TODO call executer to continue execute subquery - code = 0; - void *data = NULL; - queryDone = false; - //TODO call executer to continue execute subquery - - if (TSDB_CODE_SUCCESS != code) { - status = JOB_TASK_STATUS_FAILED; - } else if (queryDone) { - status = JOB_TASK_STATUS_SUCCEED; - } else { - status = JOB_TASK_STATUS_PARTIAL_SUCCEED; - } - - code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code); - - QW_RET(code); -} - void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { return; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 65c8a45d00..130628e799 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -274,7 +274,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef); -bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle); +bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle); /** @@ -308,19 +308,19 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle); /** * move to next block if exists * - * @param pQueryHandle + * @param pTsdbReadHandle * @return */ -bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle); +bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle); /** * Get current data block information * - * @param pQueryHandle + * @param pTsdbReadHandle * @param pBlockInfo * @return */ -void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo); +void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); /** * @@ -332,7 +332,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *p * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 * @return */ -int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis); +int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis); /** * @@ -340,11 +340,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta * the returned data block must be satisfied with the time window condition in any cases, * which means the SData data block is not actually the completed disk data blocks. * - * @param pQueryHandle query handle + * @param pTsdbReadHandle query handle * @param pColumnIdList required data columns id list * @return */ -SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList); +SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList); /** * Get the qualified table id for a super table according to the tag query expression. diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 19ca8e7ed8..0c0e3363c8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -284,7 +284,7 @@ typedef struct SQueryRuntimeEnv { uint32_t status; // query status void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not - void* pQueryHandle; + void* pTsdbReadHandle; int32_t prevGroupId; // previous executed group id bool enableGroupData; @@ -418,7 +418,7 @@ typedef struct SQueryParam { } SQueryParam; typedef struct STableScanInfo { - void *pQueryHandle; + void *pTsdbReadHandle; int32_t numOfBlocks; int32_t numOfSkipped; int32_t numOfBlockStatis; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index be1bfb8143..a6a9115b2f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2326,8 +2326,8 @@ _clean: static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; + tsdbCleanupQueryHandle(pRuntimeEnv->pTsdbReadHandle); + pRuntimeEnv->pTsdbReadHandle = NULL; SMemRef* pMemRef = &pQueryAttr->memRef; assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL); @@ -3148,10 +3148,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa } else if ((*status) == BLK_DATA_STATIS_NEEDED) { // this function never returns error? pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockStatis); if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); pCost->totalCheckedRows += pBlock->info.rows; } } else { @@ -3159,7 +3159,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // load the data block statistics to perform further filter pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pBlock->pBlockStatis); + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockStatis); if (pQueryAttr->topBotQuery && pBlock->pBlockStatis != NULL) { { // set previous window @@ -3205,7 +3205,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pCost->totalCheckedRows += pBlockInfo->rows; pCost->loadBlocks += 1; - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); if (pBlock->pDataBlock == NULL) { return terrno; } @@ -4494,7 +4494,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1); // -// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); +// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // // // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value @@ -4521,15 +4521,15 @@ void queryCostStatis(SQInfo *pQInfo) { // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); // // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; -// TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; +// TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle; // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pQueryHandle)) { +// while (tsdbNextDataBlock(pTsdbReadHandle)) { // if (isQueryKilled(pRuntimeEnv->qinfo)) { // longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); // } // -// tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); +// tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo); // // if (pQueryAttr->limit.offset > blockInfo.rows) { // pQueryAttr->limit.offset -= blockInfo.rows; @@ -4562,7 +4562,7 @@ void queryCostStatis(SQInfo *pQInfo) { // // // load the data block and check data remaining in current data block // // TODO optimize performance -// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); +// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // // tw = *win; @@ -4627,8 +4627,8 @@ void queryCostStatis(SQInfo *pQInfo) { // STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; // // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { -// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo); +// while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) { +// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo); // // if (QUERY_IS_ASC_QUERY(pQueryAttr)) { // if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { @@ -4674,7 +4674,7 @@ void queryCostStatis(SQInfo *pQInfo) { // */ // if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) { // -// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); +// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); // SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); // // if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) { @@ -4748,7 +4748,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 terrno = TSDB_CODE_SUCCESS; if (isFirstLastRowQuery(pQueryAttr)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); // update the query time window pQueryAttr->window = cond.twindow; @@ -4769,11 +4769,11 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 } } } else if (isCachedLastQuery(pQueryAttr)) { - pRuntimeEnv->pQueryHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else if (pQueryAttr->pointInterpQuery) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); + pRuntimeEnv->pTsdbReadHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef); } return terrno; @@ -4831,19 +4831,19 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr switch(tbScanner) { case OP_TableBlockInfoScan: { - pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); break; } case OP_TableSeqScan: { - pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); break; } case OP_DataBlocksOptScan: { - pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); + pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); break; } case OP_TableScan: { - pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); + pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); break; } default: { // do nothing @@ -4974,13 +4974,13 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { *newgroup = false; - while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + while (tsdbNextDataBlock(pTableScanInfo->pTsdbReadHandle)) { if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); } pTableScanInfo->numOfBlocks += 1; - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + tsdbRetrieveDataBlockInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->info); // todo opt if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) { @@ -5037,7 +5037,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { } if (++pTableScanInfo->current >= pTableScanInfo->times) { - if (pTableScanInfo->reverseTimes <= 0 || isTsdbCacheLastRow(pTableScanInfo->pQueryHandle)) { + if (pTableScanInfo->reverseTimes <= 0 || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)) { return NULL; } else { break; @@ -5046,7 +5046,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { // do prepare for the next round table scan operation STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); + tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; @@ -5069,7 +5069,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); - tsdbResetQueryHandle(pTableScanInfo->pQueryHandle, &cond); + tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); @@ -5112,8 +5112,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { tableBlockDist.maxRows = INT_MIN; tableBlockDist.minRows = INT_MAX; - tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist); - tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pQueryHandle); + tsdbGetFileBlocksDistInfo(pTableScanInfo->pTsdbReadHandle, &tableBlockDist); + tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pTsdbReadHandle); SSDataBlock* pBlock = &pTableScanInfo->block; pBlock->info.rows = 1; @@ -5142,7 +5142,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; @@ -5165,7 +5165,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->times = 1; pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; @@ -5189,7 +5189,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); SColumnInfoData infoData = {{0}}; @@ -5271,7 +5271,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; pInfo->current = 0;