From 550d276df5067d6541066f9de51697ffd2421237 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jan 2022 10:41:35 +0800 Subject: [PATCH] [td-11818] Fix bug in select * --- 2.0/src/query/src/queryMain.c | 4 +- source/libs/executor/inc/executorimpl.h | 13 +- source/libs/executor/src/dataDispatcher.c | 10 +- source/libs/executor/src/executorMain.c | 60 ++--- source/libs/executor/src/executorimpl.c | 313 +++------------------- source/libs/planner/src/logicPlan.c | 1 - source/libs/qworker/src/qworker.c | 2 +- 7 files changed, 79 insertions(+), 324 deletions(-) diff --git a/2.0/src/query/src/queryMain.c b/2.0/src/query/src/queryMain.c index 3bf031ebd5..e6d804cea3 100644 --- a/2.0/src/query/src/queryMain.c +++ b/2.0/src/query/src/queryMain.c @@ -461,7 +461,7 @@ int32_t qKillQuery(qinfo_t qinfo) { } qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId); - setQueryKilled(pQInfo); + setTaskKilled(pQInfo); // Wait for the query executing thread being stopped/ // Once the query is stopped, the owner of qHandle will be cleared immediately. @@ -634,7 +634,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return TSDB_CODE_QRY_INVALID_QHANDLE; } qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); - setQueryKilled(pQInfo); + setTaskKilled(pQInfo); // wait query stop int32_t loop = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ebf3d83a1a..b3f5370c0f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -250,9 +250,8 @@ typedef struct SExecTaskInfo { STaskCostInfo cost; int64_t owner; // if it is in execution int32_t code; - + uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - pthread_mutex_t lock; // used to synchronize the rsp/query threads char *sql; // query sql string jmp_buf env; // struct SOperatorInfo *pRoot; @@ -622,8 +621,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t nu int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters); SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code); -SQInfo *createQInfoImpl(SQueryTableReq *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo); int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger); @@ -645,20 +642,18 @@ void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status); bool onlyQueryTags(STaskAttr* pQueryAttr); //void destroyUdfInfo(struct SUdfInfo* pUdfInfo); -bool isValidQInfo(void *param); - int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); -void setQueryKilled(SQInfo *pQInfo); +void setTaskKilled(SExecTaskInfo *pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code); void calculateOperatorProfResults(SQInfo* pQInfo); -void queryCostStatis(SQInfo *pQInfo); +void queryCostStatis(SExecTaskInfo *pTaskInfo); -void doDestroyTask(SQInfo *pQInfo); +void doDestroyTask(SExecTaskInfo *pTaskInfo); void freeQueryAttr(STaskAttr *pQuery); int32_t getMaximumIdleDurationSec(); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index f9e61f91de..97a0557748 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -121,11 +121,19 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { - if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) { + uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery; + if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) { + qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, + taosQueueSize(pDispatcher->pDataBlocks)); return false; } + pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; pBuf->pData = malloc(pBuf->allocSize); + if (pBuf->pData == NULL) { + qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); + } + return NULL != pBuf->pData; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e5d56aca15..b60943d284 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -149,7 +149,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, + qError("QID:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; @@ -160,7 +160,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { } if (isTaskKilled(pTaskInfo)) { - qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); + qDebug("QID:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; } @@ -169,12 +169,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), + qDebug("QID:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; } - qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); + qDebug("QID:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -190,8 +190,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { *useconds = pTaskInfo->cost.elapsedTime; } - qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", - GET_TASKID(pTaskInfo), 0, 0L, 0); + int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; + pTaskInfo->totalRows += current; + + qDebug("QID:0x%" PRIx64 " task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", + GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0); atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; @@ -200,14 +203,14 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { SQInfo *pQInfo = (SQInfo *)qinfo; - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + if (pQInfo == NULL) { qError("QInfo invalid qhandle"); return TSDB_CODE_QRY_INVALID_QHANDLE; } *buildRes = false; if (IS_QUERY_KILLED(pQInfo)) { - qDebug("QInfo:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); + qDebug("QID:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); return pQInfo->code; } @@ -227,11 +230,11 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo assert(pQInfo->rspContext == NULL); if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; - qDebug("QInfo:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize, + qDebug("QID:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize, GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code)); } else { *buildRes = false; - qDebug("QInfo:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId); + qDebug("QID:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId); pQInfo->rspContext = pRspContext; assert(pQInfo->rspContext != NULL); } @@ -251,18 +254,18 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) { } int32_t qKillTask(qTaskInfo_t qinfo) { - SQInfo *pQInfo = (SQInfo *)qinfo; + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId); - setQueryKilled(pQInfo); + qDebug("QID:0x%"PRIx64" execTask killed", pTaskInfo->id.queryId); + setTaskKilled(pTaskInfo); // Wait for the query executing thread being stopped/ // Once the query is stopped, the owner of qHandle will be cleared immediately. - while (pQInfo->owner != 0) { + while (pTaskInfo->owner != 0) { taosMsleep(100); } @@ -270,14 +273,14 @@ int32_t qKillTask(qTaskInfo_t qinfo) { } int32_t qAsyncKillTask(qTaskInfo_t qinfo) { - SQInfo *pQInfo = (SQInfo *)qinfo; + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("QInfo:0x%"PRIx64" query async killed", pQInfo->qId); - setQueryKilled(pQInfo); + qDebug("QID:0x%"PRIx64" query async killed", pTaskInfo->id.queryId); + setTaskKilled(pTaskInfo); return TSDB_CODE_SUCCESS; } @@ -292,15 +295,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER); } -void qDestroyTask(qTaskInfo_t qHandle) { - SQInfo* pQInfo = (SQInfo*) qHandle; - if (!isValidQInfo(pQInfo)) { - return; - } +void qDestroyTask(qTaskInfo_t qTaskHandle) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; + qDebug("QID:0x%"PRIx64" execTask completed, numOfRows:%"PRId64, pTaskInfo->id.queryId, pTaskInfo->totalRows); - qDebug("QInfo:0x%"PRIx64" query completed", pQInfo->qId); - queryCostStatis(pQInfo); // print the query cost summary - doDestroyTask(pQInfo); + queryCostStatis(pTaskInfo); // print the query cost summary + doDestroyTask(pTaskInfo); } void* qOpenTaskMgmt(int32_t vgId) { @@ -381,7 +381,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) { STaskMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { - qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); + qError("QID:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } @@ -389,7 +389,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) { pthread_mutex_lock(&pQueryMgmt->lock); if (pQueryMgmt->closed) { pthread_mutex_unlock(&pQueryMgmt->lock); - qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); + qError("QID:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } else { @@ -445,7 +445,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return TSDB_CODE_QRY_INVALID_QHANDLE; } qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); - setQueryKilled(pQInfo); + setTaskKilled(pQInfo); // wait query stop int32_t loop = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bfd1bce499..9a2bb75b34 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2432,9 +2432,9 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { } bool isTaskKilled(SExecTaskInfo *pTaskInfo) { - if (IS_QUERY_KILLED(pTaskInfo)) { - return true; - } +// if (IS_QUERY_KILLED(pTaskInfo)) { +// return true; +// } // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived // abort current query execution. @@ -2444,13 +2444,13 @@ bool isTaskKilled(SExecTaskInfo *pTaskInfo) { assert(pTaskInfo->cost.start != 0); // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); - return true; +// return true; } return false; } -void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} +void setTaskKilled(SExecTaskInfo *pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} //static bool isFixedOutputQuery(STaskAttr* pQueryAttr) { // if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { @@ -4420,33 +4420,32 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { taosArrayDestroy(opStack); } -void queryCostStatis(SQInfo *pQInfo) { - STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - STaskCostInfo *pSummary = &pQInfo->summary; +void queryCostStatis(SExecTaskInfo *pTaskInfo) { + STaskCostInfo *pSummary = &pTaskInfo->cost; - uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); - hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); - pSummary->hashSize = hashSize; +// uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); +// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); +// pSummary->hashSize = hashSize; // add the merge time pSummary->elapsedTime += pSummary->firstStageMergeTime; - SResultRowPool* p = pQInfo->runtimeEnv.pool; - if (p != NULL) { - pSummary->winInfoSize = getResultRowPoolMemSize(p); - pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p); - } else { - pSummary->winInfoSize = 0; - pSummary->numOfTimeWindows = 0; - } - - calculateOperatorProfResults(pQInfo); - - //qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " -// "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, -// pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, -// pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); +// SResultRowPool* p = pTaskInfo->pool; +// if (p != NULL) { +// pSummary->winInfoSize = getResultRowPoolMemSize(p); +// pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p); +// } else { +// pSummary->winInfoSize = 0; +// pSummary->numOfTimeWindows = 0; +// } +// +// calculateOperatorProfResults(pQInfo); + qDebug("QID:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " + "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, + pTaskInfo->id.queryId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, + pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); +// //qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); @@ -7733,7 +7732,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { SExecTaskInfo* pTaskInfo = calloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pthread_mutex_init(&pTaskInfo->lock, NULL); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; return pTaskInfo; @@ -8673,229 +8671,6 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { return ((SQInfo *)qHandle)->qId == qId; } -SQInfo* createQInfoImpl(SQueryTableReq* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, - SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, - char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo) { - int16_t numOfCols = pQueryMsg->numOfCols; - int16_t numOfOutput = pQueryMsg->numOfOutput; - - SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); - if (pQInfo == NULL) { - goto _cleanup_qinfo; - } - - pQInfo->qId = qId; - pQInfo->startExecTs = 0; - - pQInfo->runtimeEnv.pUdfInfo = pUdfInfo; - - // to make sure third party won't overwrite this structure - pQInfo->signature = pQInfo; - STaskAttr* pQueryAttr = &pQInfo->query; - pQInfo->runtimeEnv.pQueryAttr = pQueryAttr; - - pQueryAttr->tableGroupInfo = *pTableGroupInfo; - pQueryAttr->numOfCols = numOfCols; - pQueryAttr->numOfOutput = numOfOutput; - pQueryAttr->limit.limit = pQueryMsg->limit; - pQueryAttr->limit.offset = pQueryMsg->offset; - pQueryAttr->order.order = pQueryMsg->order; - pQueryAttr->order.col.info.colId = pQueryMsg->orderColId; - pQueryAttr->pExpr1 = pExprs; - pQueryAttr->pExpr2 = pSecExprs; - pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput; - pQueryAttr->pGroupbyExpr = pGroupbyExpr; - memcpy(&pQueryAttr->interval, &pQueryMsg->interval, sizeof(pQueryAttr->interval)); - pQueryAttr->fillType = pQueryMsg->fillType; - pQueryAttr->numOfTags = pQueryMsg->numOfTags; - pQueryAttr->tagColList = pTagCols; - pQueryAttr->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit; - pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; -// pQueryAttr->sw = pQueryMsg->sw; - pQueryAttr->vgId = vgId; - - pQueryAttr->stableQuery = pQueryMsg->stableQuery; - pQueryAttr->topBotQuery = pQueryMsg->topBotQuery; - pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn; - pQueryAttr->hasTagResults = pQueryMsg->hasTagResults; - pQueryAttr->timeWindowInterpo = pQueryMsg->timeWindowInterpo; - pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist; - pQueryAttr->stabledev = pQueryMsg->stabledev; - pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery; - pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; - pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; - pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; - pQueryAttr->stateWindow = pQueryMsg->stateWindow; - pQueryAttr->vgId = vgId; -// pQueryAttr->pFilters = pFilters; - - pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); - if (pQueryAttr->tableCols == NULL) { - goto _cleanup; - } - - pQueryAttr->srcRowSize = 0; - pQueryAttr->maxTableColumnWidth = 0; - for (int16_t i = 0; i < numOfCols; ++i) { - pQueryAttr->tableCols[i] = pQueryMsg->tableCols[i]; -// pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters); - - pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes; - if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) { - pQueryAttr->maxTableColumnWidth = pQueryAttr->tableCols[i].bytes; - } - } - - for (int16_t col = 0; col < numOfOutput; ++col) { - assert(pExprs[col].base.resSchema.bytes > 0); - pQueryAttr->resultRowSize += pExprs[col].base.resSchema.bytes; - - // keep the tag length - if (TSDB_COL_IS_TAG(pExprs[col].base.pColumns->flag)) { - pQueryAttr->tagLen += pExprs[col].base.resSchema.bytes; - } - -// if (pExprs[col].base.flist.filterInfo) { -// ++pQueryAttr->havingNum; -// } - } - - doUpdateExprColumnIndex(pQueryAttr); - - if (pSecExprs != NULL) { - int32_t resultRowSize = 0; - - // calculate the result row size - for (int16_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { - assert(pSecExprs[col].base.resSchema.bytes > 0); - resultRowSize += pSecExprs[col].base.resSchema.bytes; - } - - if (resultRowSize > pQueryAttr->resultRowSize) { - pQueryAttr->resultRowSize = resultRowSize; - } - } - - if (pQueryAttr->fillType != TSDB_FILL_NONE) { - pQueryAttr->fillVal = malloc(sizeof(int64_t) * pQueryAttr->numOfOutput); - if (pQueryAttr->fillVal == NULL) { - goto _cleanup; - } - - // the first column is the timestamp - memcpy(pQueryAttr->fillVal, (char *)pQueryMsg->fillVal, pQueryAttr->numOfOutput * sizeof(int64_t)); - } - - size_t numOfGroups = 0; - if (pTableGroupInfo->pGroupList != NULL) { - numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); - STableGroupInfo* pTableqinfo = &pQInfo->runtimeEnv.tableqinfoGroupInfo; - - pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); - pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; - pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - } - - pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); - if (pQInfo->pBuf == NULL) { - goto _cleanup; - } - - pQInfo->dataReady = QUERY_RESULT_NOT_READY; - pQInfo->rspContext = NULL; - pQInfo->sql = sql; - pthread_mutex_init(&pQInfo->lock, NULL); - tsem_init(&pQInfo->ready, 0, 0); - - pQueryAttr->window = pQueryMsg->window; - updateDataCheckOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery); - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - STimeWindow window = pQueryAttr->window; - - int32_t index = 0; - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); - - size_t s = taosArrayGetSize(pa); - SArray* p1 = taosArrayInit(s, POINTER_BYTES); - if (p1 == NULL) { - goto _cleanup; - } - - taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1); - - for(int32_t j = 0; j < s; ++j) { -// STableKeyInfo* info = taosArrayGet(pa, j); -// window.skey = info->lastKey; -// -// void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); -// STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf); -// if (item == NULL) { -// goto _cleanup; -// } -// -// item->groupIndex = i; -// taosArrayPush(p1, &item); - -// STableId* id = TSDB_TABLEID(info->pTable); -// taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES); -// index += 1; - } - } - - colIdCheck(pQueryAttr, pQInfo->qId); - -// int32_t functionId = getExprFunctionId(&pExpr[0]); -// pQInfo->query.queryBlockDist = (functionId == FUNCTION_BLKINFO); - - //qDebug("qmsg:%p vgId:%d, QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->query.vgId, pQInfo->qId, pQInfo); - return pQInfo; - -_cleanup_qinfo: -// tsdbDestroyTableGroup(pTableGroupInfo); - - if (pGroupbyExpr != NULL) { - taosArrayDestroy(pGroupbyExpr->columnInfo); - free(pGroupbyExpr); - } - - tfree(pTagCols); - for (int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExprInfo = &pExprs[i]; - if (pExprInfo->pExpr != NULL) { - tExprTreeDestroy(pExprInfo->pExpr, NULL); - pExprInfo->pExpr = NULL; - } - -// if (pExprInfo->base.flist.filterInfo) { -// freeColumnFilterInfo(pExprInfo->base.flist.filterInfo, pExprInfo->base.flist.numOfFilters); -// } - } - - tfree(pExprs); - -// filterFreeInfo(pFilters); - -_cleanup: - doDestroyTask(pQInfo); - return NULL; -} - -bool isValidQInfo(void *param) { - SQInfo *pQInfo = (SQInfo *)param; - if (pQInfo == NULL) { - return false; - } - - /* - * pQInfo->signature may be changed by another thread, so we assign value of signature - * into local variable, then compare by using local variable - */ - uint64_t sig = (uint64_t)pQInfo->signature; - return (sig == (uint64_t)pQInfo); -} - int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger) { int32_t code = TSDB_CODE_SUCCESS; @@ -8957,7 +8732,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* _error: // table query ref will be decrease during error handling - doDestroyTask(pQInfo); +// doDestroyTask(pQInfo); return code; } @@ -9038,36 +8813,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { return NULL; } -void doDestroyTask(SQInfo *pQInfo) { - if (!isValidQInfo(pQInfo)) { - return; - } +void doDestroyTask(SExecTaskInfo *pTaskInfo) { + qDebug("QID:0x%"PRIx64" start to free execTask", pTaskInfo->id.queryId); + doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); +// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); +// taosHashCleanup(pTaskInfo->summary.operatorProfResults); - //qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId); - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); - - doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); - teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - - STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - freeQueryAttr(pQueryAttr); - -// tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo); - - tfree(pQInfo->pBuf); - tfree(pQInfo->sql); - - taosArrayDestroy(pQInfo->summary.queryProfEvents); - taosHashCleanup(pQInfo->summary.operatorProfResults); - - taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); - pQInfo->signature = 0; - - //qDebug("QInfo:0x%"PRIx64" QInfo is freed", pQInfo->qId); - - tfree(pQInfo); + qDebug("QID:0x%"PRIx64" execTask is freed", pTaskInfo->id.queryId); + tfree(pTaskInfo); } int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 620dd38360..de62f4a2ef 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -413,7 +413,6 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { tfree(pQueryTableInfo->tableName); } - printf("----------->Free:%p\n", pQueryNode->pExpr); taosArrayDestroy(pQueryNode->pExpr); tfree(pQueryNode->pExtInfo); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b53fd9ff37..d09f749805 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -492,7 +492,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); if (code) { - QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code); + QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); }