From c2a3c50f07bf238849c6f5f26dc6e0f2f9890e0a Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 24 Mar 2020 22:59:00 +0800 Subject: [PATCH 1/4] [td-32] fix memory leaks, and fix bugs in select * query. --- src/client/src/tscParseInsert.c | 11 ---- src/client/src/tscSQLParser.c | 22 ++++--- src/client/src/tscServer.c | 10 +-- src/client/src/tscSql.c | 7 +-- src/client/src/tscStream.c | 1 - src/client/src/tscUtil.c | 22 +------ src/query/src/queryExecutor.c | 105 ++++++++++++++++++-------------- src/vnode/tsdb/src/tsdbRead.c | 9 +-- 8 files changed, 83 insertions(+), 104 deletions(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index ccd734ec7b..5651e5aa38 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _clean; } - // submit to more than one vnode if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { goto _error_clean; } - - STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; - if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { - goto _error_clean; - } - - pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - - // set the next sent data vnode index in data block arraylist - pTableMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e0850a7139..6427578e82 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName)); - // for point interpolation/last_row query, we need the timestamp column to be loaded + // for all querie, the timestamp column meeds to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { - tscColumnBaseInfoInsert(pQueryInfo, &index); - } + tscColumnBaseInfoInsert(pQueryInfo, &index); SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr); @@ -1581,7 +1579,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); } } - + + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + return TSDB_CODE_SUCCESS; } case TK_SUM: @@ -1689,7 +1690,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); } } - + + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + return TSDB_CODE_SUCCESS; } case TK_FIRST: @@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } /* in first/last function, multiple columns can be add to resultset */ - for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) { tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]); if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) { @@ -1753,7 +1756,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } } } - + return TSDB_CODE_SUCCESS; } else { // select * from xxx int32_t numOfFields = 0; @@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta); } + return TSDB_CODE_SUCCESS; } } @@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt default: return TSDB_CODE_INVALID_SQL; } + + } // todo refactor diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 348c0709f1..9e1efcd6ff 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -341,14 +341,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { (*pSql->fp)(pSql->param, taosres, rpcMsg->code); if (shouldFree) { - // If it is failed, all objects allocated during execution taos_connect_a should be released - if (command == TSDB_SQL_CONNECT) { - taos_close(pObj); - tscTrace("%p Async sql close failed connection", pSql); - } else { - tscFreeSqlObj(pSql); - tscTrace("%p Async sql is automatically freed", pSql); - } + tscFreeSqlObj(pSql); + tscTrace("%p Async sql is automatically freed", pSql); } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4885cf7cc3..d62dac088b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { } if (numOfTableHasRes >= 2) { // do merge result - success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); - // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2); } else { // only one subquery SSqlObj *pSub = pSql->pSubs[0]; if (pSub == NULL) { @@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlRes *pRes = &pSql->res; if (pRes->qhandle == 0 || - pRes->completed || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; } // current data are exhausted, fetch more data - if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && + if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 46e3ac2e60..0b464c362b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } tsem_init(&pSql->rspSem, 0, 0); - tsem_init(&pSql->emptyRspSem, 0, 1); SSqlInfo SQLInfo = {0}; tSQLParse(&SQLInfo, pSql->sqlstr); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2a9673b192..d079494dde 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) { } void tscFreeSqlResult(SSqlObj* pSql) { - //TODO not free - return; - tfree(pSql->res.pRsp); pSql->res.row = 0; pSql->res.numOfRows = 0; @@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { tscFreeSqlCmdData(pCmd); tscTrace("%p free sqlObj partial completed", pSql); - - tscFreeSqlCmdData(pCmd); } void tscFreeSqlObj(SSqlObj* pSql) { @@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { pCmd->allocSize = 0; - if (pSql->fp == NULL) { - tsem_destroy(&pSql->rspSem); - tsem_destroy(&pSql->emptyRspSem); - } + tsem_destroy(&pSql->rspSem); free(pSql); } @@ -1751,16 +1743,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } int32_t command = pSql->cmd.command; - if (pTscObj->pSql == pSql) { - /* - * in case of taos_connect_a query, the object should all be released, even it is the - * master sql object. Otherwise, the master sql should not be released - */ - if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) { - return true; - } - - return false; + if (command == TSDB_SQL_CONNECT) { + return true; } if (command == TSDB_SQL_INSERT) { diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index cbce5097ec..f233ac0d99 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -53,9 +53,9 @@ /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.bytes) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.bytes) #define GET_COLUMN_TYPE(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].info.type) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdx].info.type) typedef struct SPointInterpoSupporter { int32_t numOfCols; @@ -1498,16 +1498,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel SColIndexEx * pColIndexEx = &pSqlFuncMsg->colInfo; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - - if (TSDB_COL_IS_TAG(pSqlFuncMsg->colInfo.flag)) { // process tag column info - SSchema *pSchema = getColumnModelSchema(pTagsSchema, pColIndexEx->colIdx); - - pCtx->inputType = pSchema->type; - pCtx->inputBytes = pSchema->bytes; - } else { - pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); - pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); - } + pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); + pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i); pCtx->ptsOutputBuf = NULL; @@ -1891,8 +1883,6 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0; } - - // pQuery->pointsOffset = pQuery->pointsToRead; } /* @@ -2552,7 +2542,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl // return DISK_DATA_LOAD_FAILED; } - if (pStatis == NULL) { + if (*pStatis == NULL) { pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); } } else { @@ -5025,11 +5015,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { } static void tableMultiOutputProcessor(SQInfo *pQInfo) { -#if 0 - SQuery * pQuery = &pQInfo->query; - SMeterObj *pMeterObj = pQInfo->pObj; - - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { @@ -5044,8 +5031,8 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { return; } - pQuery->pointsRead = getNumOfResult(pRuntimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->pointsRead > 0) { + pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.pointsRead > 0) { doSkipResults(pRuntimeEnv); } @@ -5053,40 +5040,31 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { * 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data * 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 */ - if (pQuery->pointsRead > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); - dTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64, - pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->limit.offset, pQuery->lastKey, - pQuery->ekey); + pQInfo, pQuery->limit.offset, pQuery->lastKey); resetCtxOutputBuf(pRuntimeEnv); } doRevisedResultsByLimit(pQInfo); - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->rec.pointsRead += pQuery->rec.pointsRead; - if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { - TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - assert(nextTimestamp > 0 || ((nextTimestamp < 0) && Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK))); - - dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, - pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->lastKey, pQuery->ekey); + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { +// dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, +// pQInfo, pQuery->lastKey, pQuery->ekey); } - dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); +// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, +// pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); - pQuery->pointsOffset = pQuery->pointsToRead; // restore the available buffer - if (!isTSCompQuery(pQuery)) { - assert(pQuery->pointsRead <= pQuery->pointsToRead); - } - -#endif +// pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer +// if (!isTSCompQuery(pQuery)) { +// assert(pQuery->pointsRead <= pQuery->pointsToRead); +// } } static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { @@ -5127,10 +5105,8 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { } } -/* handle time interval query on single table */ +// handle time interval query on table static void tableIntervalProcessor(SQInfo *pQInfo) { - // STable *pMeterObj = pQInfo->pObj; - SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); SQuery * pQuery = pRuntimeEnv->pQuery; @@ -5839,6 +5815,39 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_SUCCESS; } +static void doUpdateExprColumnIndex(SQuery* pQuery) { + assert(pQuery->pSelectExpr != NULL && pQuery != NULL); +// int32_t i = 0, j = 0; +// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) { +// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) { +// pQuery->colList[i++].colIdx = (int16_t)j++; +// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) { +// pQuery->colList[i++].colIdx = -1; +// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) { +// j++; +// } +// } + +// while (i < pQuery->numOfCols) { +// pQuery->colList[i++].colIdx = -1; // not such column in current meter +// } + + for(int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + SSqlFuncExprMsg* pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; + if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { + continue; + } + + SColIndexEx* pColIndexEx = &pSqlExprMsg->colInfo; + for(int32_t f = 0; f < pQuery->numOfCols; ++f) { + if (pColIndexEx->colId == pQuery->colList[f].info.colId) { + pColIndexEx->colIdx = f; + break; + } + } + } +} + static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray *pTableIdList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); @@ -5897,6 +5906,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou assert(pExprs[col].resBytes > 0); pQuery->rowSize += pExprs[col].resBytes; } + + doUpdateExprColumnIndex(pQuery); int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { @@ -5933,7 +5944,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // to make sure third party won't overwrite this structure - pQInfo->signature = (uint64_t)pQInfo; + pQInfo->signature = pQInfo; pQInfo->pTableIdList = pTableIdList; pQuery->pos = -1; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 31cdd70f36..36472857fe 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -335,7 +335,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; - // Convert row data to column data if (*skey == INT64_MIN) { *skey = dataRowKey(row); @@ -345,13 +344,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max int32_t offset = 0; for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0); + SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i); memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); offset += pColInfo->info.bytes; } numOfRows++; - if (numOfRows > maxRowsToRead) break; + if (numOfRows >= maxRowsToRead) break; }; return numOfRows; @@ -392,7 +391,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData } SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { - + // in case of data in cache, all data has been kept in column info object. + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + return pHandle->pColumns; } int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {} From 737d4c1072e222dc55a0e281feec2704713c2780 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 24 Mar 2020 23:01:06 +0800 Subject: [PATCH 2/4] [td-32] suppress warnings --- src/client/src/tscUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d079494dde..e33fdc70a4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -713,7 +713,7 @@ static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); - SSubmitBlk* pBlock = pTableDataBlock->pData; + SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData; int32_t rows = htons(pBlock->numOfRows); for(int32_t i = 0; i < rows; ++i) { From f743478363c8ff636481f8ff14c4bc266006bf1b Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 25 Mar 2020 14:19:00 +0800 Subject: [PATCH 3/4] [td-32]use code to denote if query is killed or not --- src/query/src/queryExecutor.c | 43 +++++++++++------------- src/vnode/detail/src/vnodeQueryImpl.c | 4 +-- src/vnode/detail/src/vnodeQueryProcess.c | 24 ++++++------- 3 files changed, 33 insertions(+), 38 deletions(-) diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index f233ac0d99..a996c8467f 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1599,7 +1599,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } -static bool isQueryKilled(SQuery *pQuery) { +static bool isQueryKilled(SQInfo *pQInfo) { + return (pQInfo->code == TSDB_CODE_QUERY_CANCELLED); #if 0 /* * check if the queried meter is going to be deleted. @@ -1613,8 +1614,6 @@ static bool isQueryKilled(SQuery *pQuery) { return (pQInfo->killed == 1); #endif - - return 0; } static bool setQueryKilled(SQInfo* pQInfo) { @@ -2641,7 +2640,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (tsdbNextDataBlock(pQueryHandle)) { // check if query is killed or not set the status of query to pass the status check - if (isQueryKilled(pQuery)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return cnt; } @@ -3607,7 +3606,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->scanFlag = REPEAT_SCAN; /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { // setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4394,14 +4393,10 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - // SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid); - // __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm]; - // dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles); - tsdb_query_handle_t *pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { break; } @@ -4583,7 +4578,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->meterIdx = start; for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4610,7 +4605,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4661,7 +4656,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -4955,7 +4950,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { doMultiMeterSupplementaryScan(pQInfo); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query killed, abort", pQInfo); return; } @@ -4994,7 +4989,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -5027,7 +5022,7 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -5074,7 +5069,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { initCtxOutputBuf(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return; } @@ -5177,7 +5172,7 @@ void qTableQuery(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } @@ -5267,7 +5262,7 @@ void qTableQuery(SQInfo *pQInfo) { pQInfo->elapsedTime += (taosGetTimestampUs() - st); /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); @@ -5309,7 +5304,7 @@ void qSuperTableQuery(void *pReadMsg) { /* record the total elapsed time */ pQInfo->elapsedTime += (taosGetTimestampUs() - st); - pQuery->status = isQueryKilled(pQuery) ? 1 : 0; + pQuery->status = isQueryKilled(pQInfo) ? 1 : 0; // taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->pointsRead, // pQInfo->query.interpoType); @@ -6172,7 +6167,7 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; @@ -6289,11 +6284,11 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c // has more data to return or need next round to execute addToTaskQueue(pQInfo); - } else if (isQueryKilled(pQuery)) { - code = TSDB_CODE_QUERY_CANCELLED; + } else { + code = pQInfo->code; } - if (isQueryKilled(pQuery) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { (*pRsp)->completed = 1; // notify no more result to client vnodeFreeQInfo(pQInfo); } diff --git a/src/vnode/detail/src/vnodeQueryImpl.c b/src/vnode/detail/src/vnodeQueryImpl.c index 9eb3fb8b65..f3e5cc27b3 100644 --- a/src/vnode/detail/src/vnodeQueryImpl.c +++ b/src/vnode/detail/src/vnodeQueryImpl.c @@ -5312,7 +5312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (1) { // check if query is killed or not set the status of query to pass the status check - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return cnt; } @@ -6375,7 +6375,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->scanFlag = REPEAT_SCAN; /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } diff --git a/src/vnode/detail/src/vnodeQueryProcess.c b/src/vnode/detail/src/vnodeQueryProcess.c index cedb76b4ac..23520f35a1 100644 --- a/src/vnode/detail/src/vnodeQueryProcess.c +++ b/src/vnode/detail/src/vnodeQueryProcess.c @@ -105,7 +105,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo int32_t start = pSupporter->pSidSet->starterPos[groupIdx]; int32_t end = pSupporter->pSidSet->starterPos[groupIdx + 1] - 1; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -276,7 +276,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo int64_t st = taosGetTimestampUs(); while (1) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { break; } @@ -363,7 +363,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1; for (; j < numOfBlocks && j >= 0; j += step) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { break; } @@ -603,7 +603,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->meterIdx = start; for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -630,7 +630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -681,7 +681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } @@ -958,7 +958,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { doMultiMeterSupplementaryScan(pQInfo); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query killed, abort", pQInfo); return; } @@ -998,7 +998,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -1033,7 +1033,7 @@ static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); doFinalizeResult(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -1087,7 +1087,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter initCtxOutputBuf(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { return; } @@ -1301,7 +1301,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { pQInfo->useconds += (taosGetTimestampUs() - st); /* check if query is killed or not */ - if (isQueryKilled(pQuery)) { + if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); pQInfo->over = 1; } else { @@ -1345,7 +1345,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { /* record the total elapsed time */ pQInfo->useconds += (taosGetTimestampUs() - st); - pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; + pQInfo->over = isQueryKilled(pQInfo) ? 1 : 0; taosInterpoSetStartInfo(&pQInfo->pTableQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, pQInfo->query.interpoType); From c0e31d6c235f4cb8cf3c1edeeb228f016b3fc6de Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 26 Mar 2020 10:34:56 +0800 Subject: [PATCH 4/4] [td-32] fix bugs in projection query --- src/dnode/src/dnodeRead.c | 58 ++++--- src/query/inc/queryExecutor.h | 28 ++-- src/query/src/queryExecutor.c | 275 +++++++++++++++------------------- src/vnode/tsdb/src/tsdbRead.c | 10 +- 4 files changed, 181 insertions(+), 190 deletions(-) diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 3419128c72..0ae093728f 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -226,47 +226,59 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { rpcFreeCont(pRead->rpcMsg.pCont); // free the received message } +static void dnodeContinueExecuteQuery(void* qhandle, SReadMsg *pMsg) { + SReadMsg readMsg = { + .rpcMsg = {.msgType = TSDB_MSG_TYPE_QUERY}, + .pCont = qhandle, + .contLen = 0, + .pRpcContext = pMsg->pRpcContext, + .pVnode = pMsg->pVnode, + }; + + taos_queue queue = dnodeGetVnodeRworker(pMsg->pVnode); + taosWriteQitem(queue, &readMsg); +} + static void dnodeProcessQueryMsg(SReadMsg *pMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQInfo* pQInfo = NULL; - void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); + if (pMsg->rpcMsg.contLen != 0) { + void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, pMsg, &pQInfo); - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; - pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = sizeof(SQueryTableRsp), - .code = code, - .msgType = 0 - }; + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->code = code; + pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - rpcSendResponse(&rpcRsp); + SRpcMsg rpcRsp = { + .handle = pMsg->rpcMsg.handle, + .pCont = pRsp, + .contLen = sizeof(SQueryTableRsp), + .code = code, + .msgType = 0 + }; + + rpcSendResponse(&rpcRsp); + } else { + pQInfo = pMsg->pCont; + } // do execute query qTableQuery(pQInfo); } -static int32_t c = 0; static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - if ((++c)%2 == 0) { - int32_t k = 1; - } - int32_t rowSize = 0; - int32_t numOfRows = 0; + int32_t contLen = 0; SRetrieveTableRsp *pRsp = NULL; - int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); + int32_t code = qRetrieveQueryResultInfo(pQInfo); if (code != TSDB_CODE_SUCCESS) { contLen = sizeof(SRetrieveTableRsp); @@ -275,6 +287,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { } else { // todo check code and handle error in build result set code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); + + if (qNeedFurtherExec(pQInfo)) { + dnodeContinueExecuteQuery(pQInfo, pMsg); + } } SRpcMsg rpcRsp = (SRpcMsg) { diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 4ce606f599..c30d47e261 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -68,8 +68,10 @@ typedef struct SWindowResult { } SWindowResult; typedef struct SResultRec { - int64_t pointsTotal; - int64_t pointsRead; + int64_t total; + int64_t size; + int64_t capacity; + int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client } SResultRec; typedef struct SWindowResInfo { @@ -112,7 +114,7 @@ typedef struct STableQueryInfo { typedef struct STableDataInfo { int32_t numOfBlocks; - int32_t start; // start block index + int32_t start; // start block index int32_t tableIndex; void* pMeterObj; int32_t groupIdx; // group id in table list @@ -143,7 +145,6 @@ typedef struct SQuery { int32_t pos; int64_t pointsOffset; // the number of points offset to save read data SData** sdata; - int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { void* signature; - void* pVnode; + void* param; // pointer to the RpcReadMsg TSKEY startTime; TSKEY elapsedTime; - SResultRec rec; int32_t pointsInterpo; - int32_t code; // error code to returned to client -// int32_t killed; // denotes if current query is killed + int32_t code; // error code to returned to client sem_t dataReady; - SArray* pTableIdList; // table list + SArray* pTableIdList; // table id list SQueryRuntimeEnv runtimeEnv; int32_t subgroupIdx; int32_t offset; /* offset in group result set of subgroup */ @@ -204,7 +203,7 @@ typedef struct SQInfo { * @param pQInfo * @return */ -int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); +int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo); /** * query on single table @@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg); * wait for the query completed, and retrieve final results to client * @param pQInfo */ -int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); +int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo); /** * @@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro */ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); +/** + * + * @param pQInfo + * @return + */ +bool qNeedFurtherExec(SQInfo* pQInfo); + #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a996c8467f..a4da2073ea 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -364,8 +364,8 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio bool doRevisedResultsByLimit(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - if ((pQuery->limit.limit > 0) && (pQuery->rec.pointsRead + pQInfo->rec.pointsRead > pQuery->limit.limit)) { - pQuery->rec.pointsRead = pQuery->limit.limit - pQInfo->rec.pointsRead; + if ((pQuery->limit.limit > 0) && (pQuery->rec.size + pQuery->rec.size > pQuery->limit.limit)) { + pQuery->rec.size = pQuery->limit.limit - pQuery->rec.size; // query completed setQueryStatus(pQuery, QUERY_COMPLETED); @@ -1344,17 +1344,16 @@ static int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forward static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, __block_search_fn_t searchFn, int32_t *numOfRes, SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { *numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } - + TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pQuery->lastKey = lastKey + step; + pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); @@ -1368,12 +1367,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl assert(*numOfRes >= 0); // check if buffer is large enough for accommodating all qualified points - if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1) { - pQuery->pointsOffset -= *numOfRes; - if (pQuery->pointsOffset <= 0) { // todo return correct numOfRes for ts_comp function - pQuery->pointsOffset = 0; - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } + if (*numOfRes > 0 && pQuery->checkBufferInLoop == 1 && ((*numOfRes) >= pQuery->rec.threshold)) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); } return 0; @@ -2302,7 +2297,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->status = 0; - pQInfo->rec = (SResultRec){0}; + pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0}; changeExecuteScanOrder(pQuery, true); @@ -2668,9 +2663,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } int32_t numOfRes = 0; - SDataStatis *pStatis = NULL; - SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); + + SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, &pRuntimeEnv->windowResInfo, pDataBlock); @@ -3035,9 +3030,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { offset += pData->numOfElems; } - assert(pQuery->rec.pointsRead == 0); + assert(pQuery->rec.size == 0); - pQuery->rec.pointsRead += rows; + pQuery->rec.size += rows; pQInfo->offset += 1; } @@ -3367,7 +3362,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->capacity); + memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity); } initCtxOutputBuf(pRuntimeEnv); @@ -3414,14 +3409,14 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (pQuery->rec.pointsRead == 0 || pQuery->limit.offset == 0) { + if (pQuery->rec.size == 0 || pQuery->limit.offset == 0) { return; } - if (pQuery->rec.pointsRead <= pQuery->limit.offset) { - pQuery->limit.offset -= pQuery->rec.pointsRead; + if (pQuery->rec.size <= pQuery->limit.offset) { + pQuery->limit.offset -= pQuery->rec.size; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; // pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer resetCtxOutputBuf(pRuntimeEnv); @@ -3430,13 +3425,13 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->status &= (~QUERY_RESBUF_FULL); } else { int32_t numOfSkip = (int32_t)pQuery->limit.offset; - pQuery->rec.pointsRead -= numOfSkip; + pQuery->rec.size -= numOfSkip; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; assert(0); - // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); + // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes); pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -3999,8 +3994,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC; int32_t numOfResult = doCopyToSData(pQInfo, result, orderType); - pQuery->rec.pointsRead += numOfResult; - // assert(pQuery->rec.pointsRead <= pQuery->pointsToRead); + pQuery->rec.size += numOfResult; + + assert(pQuery->rec.size <= pQuery->rec.capacity); } static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) { @@ -4038,31 +4034,6 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo updatelastkey(pQuery, pTableQueryInfo); } -// we need to split the refstatsult into different packages. -int32_t vnodeGetResultSize(void *thandle, int32_t *numOfRows) { - SQInfo *pQInfo = (SQInfo *)thandle; - SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; - - /* - * get the file size and set the numOfRows to be the file size, since for tsComp query, - * the returned row size is equalled to 1 - * - * TODO handle the case that the file is too large to send back one time - */ - if (isTSCompQuery(pQuery) && (*numOfRows) > 0) { - struct stat fstat; - if (stat(pQuery->sdata[0]->data, &fstat) == 0) { - *numOfRows = fstat.st_size; - return fstat.st_size; - } else { - dError("QInfo:%p failed to get file info, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); - return 0; - } - } else { - return pQuery->rowSize * (*numOfRows); - } -} - bool vnodeHasRemainResults(void *handle) { SQInfo *pQInfo = (SQInfo *)handle; @@ -4074,7 +4045,7 @@ bool vnodeHasRemainResults(void *handle) { SQuery * pQuery = pRuntimeEnv->pQuery; SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo; - if (pQuery->limit.limit > 0 && pQInfo->rec.pointsRead >= pQuery->limit.limit) { + if (pQuery->limit.limit > 0 && pQuery->rec.size >= pQuery->limit.limit) { return false; } @@ -4147,6 +4118,11 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); data += bytes * numOfRows; } + + // all data returned, set query over + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + setQueryStatus(pQuery, QUERY_OVER); + } } int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, @@ -4255,8 +4231,6 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQuery, false); - pQInfo->rec = (SResultRec){0}; - // dataInCache requires lastKey value pQuery->lastKey = pQuery->window.skey; @@ -4535,7 +4509,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start // accumulate the point interpolation result if (numOfRes > 0) { - pQuery->rec.pointsRead += numOfRes; + pQuery->rec.size += numOfRes; forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } @@ -4623,7 +4597,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pSupporter->subgroupIdx++; // output buffer is full, return to client - if (pQuery->pointsRead >= pQuery->pointsToRead) { + if (pQuery->size >= pQuery->pointsToRead) { break; } } @@ -4639,9 +4613,9 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { */ if (pSupporter->subgroupIdx > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->size += pQuery->size; - if (pQuery->pointsRead > 0) { + if (pQuery->size > 0) { return; } } @@ -4707,7 +4681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { vnodeScanAllData(pRuntimeEnv); - pQuery->pointsRead = getNumOfResult(pRuntimeEnv); + pQuery->size = getNumOfResult(pRuntimeEnv); doSkipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed @@ -4742,7 +4716,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pQuery->skey = pQuery->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter - if (pQuery->pointsRead == 0) { + if (pQuery->size == 0) { assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); continue; } else { @@ -4789,17 +4763,17 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { } pQInfo->pTableQuerySupporter->subgroupIdx = 0; - pQuery->pointsRead = 0; + pQuery->size = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } - pQInfo->pointsRead += pQuery->pointsRead; + pQInfo->size += pQuery->size; pQuery->pointsOffset = pQuery->pointsToRead; dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, - pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo, vid, pTableIdList->numOfTables, pSupporter->meterIdx, pTableIdList->numOfSubSet, pQuery->size, pQInfo->size, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); #endif } @@ -4911,13 +4885,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; - if (pQuery->rec.pointsRead == 0) { + if (pQuery->rec.size == 0) { // vnodePrintQueryStatistics(pSupporter); } - dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.pointsRead, pQInfo->rec.pointsTotal); + dTrace("QInfo:%p current:%lldd, total:%lldd", pQInfo, pQuery->rec.size, pQuery->rec.total); return; } #if 0 @@ -4970,8 +4944,8 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { } // handle the limitation of output buffer - pQInfo->pointsRead += pQuery->pointsRead; - dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, + pQInfo->size += pQuery->size; + dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->size, pQInfo->size, pQInfo->pointsReturned); #endif } @@ -4994,8 +4968,8 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { } // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. - pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); - // assert(pQuery->pointsRead <= pQuery->pointsToRead && + pQuery->rec.size = getNumOfResult(pRuntimeEnv); + // assert(pQuery->size <= pQuery->pointsToRead && // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)); // must be top/bottom query if offset > 0 @@ -5006,7 +4980,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { doSkipResults(pRuntimeEnv); doRevisedResultsByLimit(pQInfo); - pQInfo->rec.pointsRead = pQuery->rec.pointsRead; + pQuery->rec.size = pQuery->rec.size; } static void tableMultiOutputProcessor(SQInfo *pQInfo) { @@ -5026,16 +5000,16 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { return; } - pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); - if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.pointsRead > 0) { + pQuery->rec.size = getNumOfResult(pRuntimeEnv); + if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.size > 0) { doSkipResults(pRuntimeEnv); } /* - * 1. if pQuery->pointsRead == 0, pQuery->limit.offset >= 0, still need to check data - * 2. if pQuery->pointsRead > 0, pQuery->limit.offset must be 0 + * 1. if pQuery->size == 0, pQuery->limit.offset >= 0, still need to check data + * 2. if pQuery->size > 0, pQuery->limit.offset must be 0 */ - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } @@ -5046,23 +5020,21 @@ static void tableMultiOutputProcessor(SQInfo *pQInfo) { } doRevisedResultsByLimit(pQInfo); - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { -// dTrace("QInfo:%p vid:%d sid:%d id:%s, query abort due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, -// pQInfo, pQuery->lastKey, pQuery->ekey); + dTrace("QInfo:%p query paused due to buffer limitation, next qrange:%" PRId64 "-%" PRId64, + pQInfo, pQuery->lastKey, pQuery->window.ekey); } // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, -// pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); +// pMeterObj->sid, pMeterObj->meterId, pQuery->size, pQInfo->size, pQInfo->pointsReturned); // pQuery->pointsOffset = pQuery->pointsToRead; //restore the available buffer // if (!isTSCompQuery(pQuery)) { -// assert(pQuery->pointsRead <= pQuery->pointsToRead); +// assert(pQuery->size <= pQuery->pointsToRead); // } } -static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { +static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { @@ -5088,13 +5060,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= c; } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - break; - } - - // load the data block for the next retrieve - // loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED|QUERY_RESBUF_FULL)) { break; } } @@ -5108,12 +5074,11 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { int32_t numOfInterpo = 0; while (1) { - resetCtxOutputBuf(pRuntimeEnv); - vnodeSingleMeterIntervalMainLooper(pRuntimeEnv); + tableIntervalProcessImpl(pRuntimeEnv); if (pQuery->intervalTime > 0) { pQInfo->subgroupIdx = 0; // always start from 0 - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); @@ -5124,43 +5089,43 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { doRevisedResultsByLimit(pQInfo); break; } else { - taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.pointsRead, pQuery->interpoType); + taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.size, pQuery->interpoType); SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.pointsRead * pQuery->pSelectExpr[i].resBytes); + memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.size * pQuery->pSelectExpr[i].resBytes); } numOfInterpo = 0; - pQuery->rec.pointsRead = vnodeQueryResultInterpolate( - pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); + pQuery->rec.size = vnodeQueryResultInterpolate( + pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.size, &numOfInterpo); - dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.size); + if (pQuery->rec.size > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { doRevisedResultsByLimit(pQInfo); break; } // no result generated yet, continue retrieve data - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; } } // all data scanned, the group by normal column can return if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result pQInfo->subgroupIdx = 0; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); } - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; pQInfo->pointsInterpo += numOfInterpo; // dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, - // pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, numOfInterpo, + // pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } void qTableQuery(SQInfo *pQInfo) { @@ -5187,16 +5152,16 @@ void qTableQuery(SQInfo *pQInfo) { int32_t numOfInterpo = 0; int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); - pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, + pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); doRevisedResultsByLimit(pQInfo); pQInfo->pointsInterpo += numOfInterpo; - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", - // pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, + // pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo, // pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; @@ -5206,22 +5171,22 @@ void qTableQuery(SQInfo *pQInfo) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || - (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { + (pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) { // todo limit the output for interval query? - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; pQInfo->subgroupIdx = 0; // always start from 0 if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - pQInfo->rec.pointsRead += pQuery->rec.pointsRead; + pQuery->rec.size += pQuery->rec.size; clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); - if (pQuery->rec.pointsRead > 0) { + if (pQuery->rec.size > 0) { // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, - // pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, + // pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; @@ -5231,7 +5196,7 @@ void qTableQuery(SQInfo *pQInfo) { // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, // pMeterObj->sid, - // pMeterObj->meterId, pQInfo->pointsRead); + // pMeterObj->meterId, pQInfo->size); // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); @@ -5239,7 +5204,7 @@ void qTableQuery(SQInfo *pQInfo) { } // number of points returned during this query - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); @@ -5265,7 +5230,7 @@ void qTableQuery(SQInfo *pQInfo) { if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { - dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); + dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); } sem_post(&pQInfo->dataReady); @@ -5288,7 +5253,7 @@ void qSuperTableQuery(void *pReadMsg) { // assert(pQInfo->refCount >= 1); #if 0 SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; - pQuery->rec.pointsRead = 0; + pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); if (pQuery->intervalTime > 0 || @@ -5306,13 +5271,13 @@ void qSuperTableQuery(void *pReadMsg) { pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQuery->status = isQueryKilled(pQInfo) ? 1 : 0; -// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->pointsRead, +// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, // pQInfo->query.interpoType); - if (pQuery->rec.pointsRead == 0) { + if (pQuery->rec.size == 0) { // pQInfo->over = 1; // dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, -// pQInfo->pointsRead); +// pQInfo->size); // vnodePrintQueryStatistics(pSupporter); } @@ -5916,12 +5881,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // set the output buffer capacity - pQuery->capacity = 4096; + pQuery->rec.capacity = 4096; + pQuery->rec.threshold = 2; + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { assert(pExprs[col].interResBytes >= pExprs[col].resBytes); // allocate additional memory for interResults that are usually larger then final results - size_t size = (pQuery->capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); + size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); pQuery->sdata[col] = (SData *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _clean_memory; @@ -5943,9 +5910,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->pTableIdList = pTableIdList; pQuery->pos = -1; - // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - // pQInfo); - + + dTrace("QInfo %p is allocated", pQInfo); return pQInfo; _clean_memory: @@ -6098,7 +6064,7 @@ _error: return code; } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) { assert(pQueryTableMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6136,6 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQ // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); } else { code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); + (*pQInfo)->param = param; } _query_over: @@ -6161,7 +6128,7 @@ _query_over: return TSDB_CODE_SUCCESS; } -int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) { +int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } @@ -6177,11 +6144,8 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro } sem_wait(&pQInfo->dataReady); - - *numOfRows = pQInfo->rec.pointsRead; - *rowsize = pQuery->rowSize; - - dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); + dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size, + pQInfo->code); return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } @@ -6208,7 +6172,7 @@ static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { } } -static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { +static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // the remained number of retrieved rows, not the interpolated result SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -6231,28 +6195,31 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { pQuery->sdata[0]->data, strerror(errno)); } } else { - doCopyQueryResultToMsg(pQInfo, pQInfo->rec.pointsRead, data); + doCopyQueryResultToMsg(pQInfo, pQuery->rec.size, data); } - pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; - dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); + pQuery->rec.total += pQuery->rec.size; + dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); - setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; // todo if interpolation exists, the result may be dump to client by several rounds } -static void addToTaskQueue(SQInfo* pQInfo) { - // no error occurred, continue retrieving data - if (pQInfo->code == TSDB_CODE_SUCCESS) { -#ifdef _TD_ARM_ - dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:doDumpQueryResult", pQInfo, pQInfo->signature); -#else - dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); -#endif - - // todo add to task queue +bool qNeedFurtherExec(SQInfo* pQInfo) { + if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { + return false; + } + + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { + return false; + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + return true; + } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + return true; + } else { + assert(0); } } @@ -6262,13 +6229,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c } SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - size_t size = getResultSize(pQInfo, &pQInfo->rec.pointsRead); + size_t size = getResultSize(pQInfo, &pQuery->rec.size); *contLen = size + sizeof(SRetrieveTableRsp); // todo handle failed to allocate memory *pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen); - - (*pRsp)->numOfRows = htonl(pQInfo->rec.pointsRead); + (*pRsp)->numOfRows = htonl(pQuery->rec.size); int32_t code = pQInfo->code; if (code == TSDB_CODE_SUCCESS) { @@ -6279,16 +6245,13 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c (*pRsp)->useconds = 0; } - if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { - code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); - - // has more data to return or need next round to execute - addToTaskQueue(pQInfo); + if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) { + code = doDumpQueryResult(pQInfo, (*pRsp)->data); } else { code = pQInfo->code; } - if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { (*pRsp)->completed = 1; // notify no more result to client vnodeFreeQInfo(pQInfo); } diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 36472857fe..2dc61f5107 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle { int32_t tableIndex; bool isFirstSlot; void * qinfo; // query info handle, for debug purpose + SSkipListIterator* memIter; } STsdbQueryHandle; int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { @@ -367,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { int32_t rows = 0; if (pTable->mem != NULL) { - SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData); - rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle); + + // create mem table iterator if it is not created yet + if (pHandle->memIter == NULL) { + pHandle->memIter = tSkipListCreateIter(pTable->mem->pData); + } + + rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle); } SDataBlockInfo blockInfo = {