diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a96acc0f11..16433dc34e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -832,7 +832,7 @@ SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle); +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c26c1bcb32..b4431a7c3b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -503,12 +503,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, if (handle) { void* pSinkParam = NULL; - - SArray* pInfoList = getTableListInfo(*pTask); - STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0); - taosArrayDestroy(pInfoList); - - code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTableListInfo, readHandle); + code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle); if (code != TSDB_CODE_SUCCESS) { qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str); goto _error; @@ -1099,14 +1094,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SStreamScanInfo* pInfo = pOperator->info; STableScanInfo* pScanInfo = pInfo->pTableScanOp->info; STableScanBase* pScanBaseInfo = &pScanInfo->base; - STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; + STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - // set version to read for wal is next, so +1 - if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { + // let's seek to the next version in wal file + if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id); return -1; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 71bb1c7361..11753c181c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1586,7 +1586,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { return -1; } -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle) { +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) { switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam)); @@ -1604,23 +1604,26 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* return TSDB_CODE_OUT_OF_MEMORY; } - int32_t tbNum = tableListGetSize(pTableListInfo); + SArray* pInfoList = getTableListInfo(pTask); + STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0); + taosArrayDestroy(pInfoList); + pDeleterParam->suid = tableListGetSuid(pTableListInfo); // TODO extract uid list - pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); + int32_t numOfTables = tableListGetSize(pTableListInfo); + pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t)); if (NULL == pDeleterParam->pUidList) { taosMemoryFree(pDeleterParam); return TSDB_CODE_OUT_OF_MEMORY; } - for (int32_t i = 0; i < tbNum; ++i) { + for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i); taosArrayPush(pDeleterParam->pUidList, &pTable->uid); } *pParam = pDeleterParam; - break; } default: