From 2360f072a84dba8086dda533e8297d2821b5b8b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Jun 2022 11:43:11 +0800 Subject: [PATCH] fix(query): add indef process procedure. --- include/libs/executor/executor.h | 9 +-------- source/libs/executor/inc/executorimpl.h | 4 ++-- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorMain.c | 4 ++-- source/libs/executor/src/executorimpl.c | 17 ++++++++++++++++- source/libs/qworker/inc/qwMsg.h | 2 +- source/libs/qworker/src/qwMsg.c | 4 +--- source/libs/qworker/src/qworker.c | 6 +++--- 8 files changed, 27 insertions(+), 21 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3f05c27453..fdedb947d7 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -95,7 +95,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo * @return */ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model); + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model); /** * @@ -159,13 +159,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); */ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); -/** - * Update the table id list of a given query. - * @param uid child table uid - * @param type operation type: ADD|DROP - * @return - */ -int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 758c031028..b435446513 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -189,7 +189,7 @@ typedef struct SExecTaskInfo { } schemaVer; STableListInfo tableqinfoList; // this is a table list - char* sql; // query sql string + const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] struct SOperatorInfo* pRoot; @@ -894,7 +894,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - EOPTR_EXEC_MODEL model); + const char* sql, EOPTR_EXEC_MODEL model); int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t* resNum); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index bebfc75f17..b1d076e8f5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -121,7 +121,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index b1125c8e80..00158d7024 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -31,13 +31,13 @@ static void initRefPool() { } int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; taosThreadOnce(&initPoolOnce, initRefPool); - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d066284c56..9ce22431e1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -736,6 +736,20 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc // _rowts/_c0, not tbname column if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { // do nothing + } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]); + pfCtx->fpSet.init(&pCtx[k], pResInfo); + + pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); + pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset + + // set the timestamp(_rowts) output buffer + if (taosArrayGetSize(pPseudoList) > 0) { + int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); + pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; + } + + numOfRows = pfCtx->fpSet.process(pfCtx); } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); @@ -5183,7 +5197,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT } int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - EOPTR_EXEC_MODEL model) { + const char* sql, EOPTR_EXEC_MODEL model) { uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; @@ -5193,6 +5207,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } + (*pTaskInfo)->sql = sql; (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 29861d87ac..ecff861f50 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -24,7 +24,7 @@ extern "C" { #include "dataSinkMgt.h" int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain); +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 848a0420ca..5635ec8fc6 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -308,10 +308,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; char * sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql); - taosMemoryFreeClear(sql); - - QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain)); + QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 451607e7d0..57ebb89ed2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -510,7 +510,7 @@ _return: } -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) { int32_t code = 0; bool queryRsped = false; SSubplan *plan = NULL; @@ -537,7 +537,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ctx->plan = plan; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -938,7 +938,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes ctx.plan = plan; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code);