diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 9e28911784..16e1a1c395 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -64,7 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols); /** * Set the input data block for the stream scan. diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 757749a9b6..07bee22a1f 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,7 +88,7 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; - + int32_t numOfCols; // number of out pout column, temporarily used } STqExecHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1802f9be1a..f6862621f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -506,7 +506,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .initTqReader = true, .version = ver, }; - pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); + pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f18b25bef4..3a6afb22ac 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -15,7 +15,7 @@ #include "tq.h" -static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) { +static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -29,7 +29,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) // TODO enable compress int32_t actualLen = 0; - blockEncode(pBlock, pRetrieve->data, &actualLen, taosArrayGetSize(pBlock->pDataBlock), false); + blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); actualLen += sizeof(SRetrieveTableRsp); ASSERT(actualLen <= dataStrLen); taosArrayPush(pRsp->blockDataLen, &actualLen); @@ -87,7 +87,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa tqDebug("task execute end, get %p", pDataBlock); if (pDataBlock != NULL) { - tqAddBlockDataToRsp(pDataBlock, pRsp); + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); pRsp->blockNum++; if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { @@ -195,7 +195,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; ASSERT(0); } - tqAddBlockDataToRsp(&block, pRsp); + tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); if (pRsp->withTbName) { int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; tqAddTbNameToRsp(pTq, uid, pRsp); @@ -213,7 +213,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; ASSERT(0); } - tqAddBlockDataToRsp(&block, pRsp); + tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); if (pRsp->withTbName) { int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; tqAddTbNameToRsp(pTq, uid, pRsp); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index e6df58696d..468490350a 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -92,7 +92,8 @@ int32_t tqMetaOpen(STQ* pTq) { .initTqReader = true, .version = handle.snapshotVer, }; - handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); + + handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols); ASSERT(handle.execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ad12ee60f1..6c3d0648e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1961,9 +1961,11 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* SDelIdx idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ); - code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL); - if (code != TSDB_CODE_SUCCESS) { - goto _err; + if (pIdx != NULL) { + code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } } } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a7e033e547..d7c283c70d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -159,27 +159,28 @@ typedef struct { int64_t recoverEndVer; } SStreamTaskInfo; +typedef struct { + char* tablename; + char* dbname; + int32_t tversion; + SSchemaWrapper* sw; +} SSchemaInfo; + typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; - SStreamTaskInfo streamInfo; - - struct { - char* tablename; - char* dbname; - int32_t tversion; - SSchemaWrapper* sw; - } schemaVer; - - STableListInfo tableqinfoList; // this is a table list - const char* sql; // query sql string - jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SStreamTaskInfo streamInfo; + SSchemaInfo schemaInfo; + STableListInfo tableqinfoList; // this is a table list + const char* sql; // query sql string + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SSubplan* pSubplan; struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -248,13 +249,13 @@ typedef struct SLoadRemoteDataInfo { } SLoadRemoteDataInfo; typedef struct SLimitInfo { - SLimit limit; - SLimit slimit; - uint64_t currentGroupId; - int64_t remainGroupOffset; - int64_t numOfOutputGroups; - int64_t remainOffset; - int64_t numOfOutputRows; + SLimit limit; + SLimit slimit; + uint64_t currentGroupId; + int64_t remainGroupOffset; + int64_t numOfOutputGroups; + int64_t remainOffset; + int64_t numOfOutputRows; } SLimitInfo; typedef struct SExchangeInfo { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 395b42aa86..c46485a332 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -227,14 +227,14 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf for (int32_t i = 0; i < numOfCols; ++i) { SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); - for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) { - if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId && + for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) { + if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { (*pSlotIds)[pColMatch->targetSlotId] = -1; break; } - if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) { + if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) { (*pSlotIds)[pColMatch->targetSlotId] = j; break; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index b8495faffd..f0fb5852a0 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -67,15 +67,15 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { return false; } +// clang-format off // data format: -// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+ -// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2 -// length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes) -// |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | -// actual size | | -// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+ +// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ +// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes) +// | |sizeof(int32) |sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | +// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header +// clang-format on static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { int32_t numOfCols = 0; SNode* pNode; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 90952d5786..0bf294f33d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -104,27 +104,40 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers) { +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols) { if (msg == NULL) { // TODO create raw scan return NULL; } - struct SSubplan* plan = NULL; - int32_t code = qStringToSubplan(msg, &plan); + struct SSubplan* pPlan = NULL; + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); + code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); if (code != TSDB_CODE_SUCCESS) { - // TODO: destroy SSubplan & pTaskInfo + nodesDestroyNode((SNode*)pPlan); + qDestroyTask(pTaskInfo); terrno = code; return NULL; } + // extract the number of output columns + SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; + *numOfCols = 0; + + SNode* pNode; + FOREACH(pNode, pDescNode->pSlots) { + SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; + if (pSlotDesc->output) { + ++(*numOfCols); + } + } + return pTaskInfo; } @@ -135,17 +148,18 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { /*qDebugL("stream task string %s", (const char*)msg);*/ - struct SSubplan* plan = NULL; - int32_t code = qStringToSubplan(msg, &plan); + struct SSubplan* pPlan = NULL; + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { - // TODO: destroy SSubplan & pTaskInfo + nodesDestroyNode((SNode*)pPlan); + qDestroyTask(pTaskInfo); terrno = code; return NULL; } @@ -227,19 +241,19 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - if (pTaskInfo->schemaVer.sw == NULL) { + if (pTaskInfo->schemaInfo.sw == NULL) { return TSDB_CODE_SUCCESS; } - *sversion = pTaskInfo->schemaVer.sw->version; - *tversion = pTaskInfo->schemaVer.tversion; - if (pTaskInfo->schemaVer.dbname) { - strcpy(dbName, pTaskInfo->schemaVer.dbname); + *sversion = pTaskInfo->schemaInfo.sw->version; + *tversion = pTaskInfo->schemaInfo.tversion; + if (pTaskInfo->schemaInfo.dbname) { + strcpy(dbName, pTaskInfo->schemaInfo.dbname); } else { dbName[0] = 0; } - if (pTaskInfo->schemaVer.tablename) { - strcpy(tableName, pTaskInfo->schemaVer.tablename); + if (pTaskInfo->schemaInfo.tablename) { + strcpy(tableName, pTaskInfo->schemaInfo.tablename); } else { tableName[0] = 0; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 9d3d62cabf..c3efbf9336 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -39,6 +39,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, taosThreadOnce(&initPoolOnce, initRefPool); atexit(cleanupRefPool); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2bbee57faf..7ff9e90c75 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4123,7 +4123,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTaskInfo->schemaVer.dbname = strdup(dbFName); + pTaskInfo->schemaInfo.dbname = strdup(dbFName); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; @@ -4149,35 +4149,35 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo return terrno; } - pTaskInfo->schemaVer.tablename = strdup(mr.me.name); + pTaskInfo->schemaInfo.tablename = strdup(mr.me.name); if (mr.me.type == TSDB_SUPER_TABLE) { - pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; + pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; metaGetTableEntryByUid(&mr, suid); - pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version; + pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; } else { - pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); + pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } metaReaderClear(&mr); return TSDB_CODE_SUCCESS; } -static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) { - taosMemoryFreeClear(pTaskInfo->schemaVer.dbname); - if (pTaskInfo->schemaVer.sw == NULL) { +static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { + taosMemoryFreeClear(pSchemaInfo->dbname); + if (pSchemaInfo->sw == NULL) { return; } - taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema); - taosMemoryFree(pTaskInfo->schemaVer.sw); - taosMemoryFree(pTaskInfo->schemaVer.tablename); + taosMemoryFree(pSchemaInfo->tablename); + taosMemoryFree(pSchemaInfo->sw->pSchema); + taosMemoryFree(pSchemaInfo->sw); } static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { @@ -4935,6 +4935,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } (*pTaskInfo)->sql = sql; + (*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); if (NULL == (*pTaskInfo)->pRoot) { @@ -4973,7 +4974,9 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { doDestroyTableList(&pTaskInfo->tableqinfoList); destroyOperatorInfo(pTaskInfo->pRoot); - cleanupTableSchemaInfo(pTaskInfo); + cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); + + nodesDestroyNode((SNode*)pTaskInfo->pSubplan); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 026222f83a..b0a74c3002 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1543,6 +1543,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pPullWins); blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pRecycledPages); + blockDataDestroy(pInfo->pUpdateRes); if (pInfo->pChildren) { int32_t size = taosArrayGetSize(pInfo->pChildren); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index c8e5204e91..8f036714c9 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -150,7 +150,6 @@ typedef struct SQWTaskCtx { void *taskHandle; void *sinkHandle; - SSubplan *plan; STbVerInfo tbInfo; } SQWTaskCtx; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index b56cb29628..c7bf7ab7e7 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -306,11 +306,6 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { dsDestroyDataSinker(ctx->sinkHandle); ctx->sinkHandle = NULL; } - - if (ctx->plan) { - nodesDestroyNode((SNode*)ctx->plan); - ctx->plan = NULL; - } } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { @@ -327,7 +322,6 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); - atomic_store_ptr(&ctx->plan, NULL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e99695e962..ebccb7950c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -522,8 +522,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { QW_ERR_JRET(code); } - ctx->plan = plan; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -928,8 +926,6 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { QW_ERR_JRET(code); } - ctx.plan = plan; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));