From 1bccf8a71a628ac5a72dcce5faf3bf4f4c1599bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 23 Aug 2021 17:00:40 +0800 Subject: [PATCH 01/10] [td-6260]: Optimize the client-side query performance when multiple group result exists. --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscGlobalmerge.c | 374 ++++++++---------- src/client/src/tscSQLParser.c | 1 - src/query/inc/qExecutor.h | 15 +- src/query/src/qExecutor.c | 198 ++++++---- src/query/src/qFill.c | 2 +- .../general/parser/columnValue_float.sim | 8 +- 7 files changed, 313 insertions(+), 287 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 8c1037127a..82929e787e 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -36,7 +36,7 @@ extern "C" { (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ - (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) + (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index e696d54abd..ed99fcbabf 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -35,6 +35,7 @@ typedef struct SCompareParam { static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { int32_t ret = 0; + size_t size = taosArrayGetSize(columnIndexList); if (size > 0) { ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC); @@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc (*hasPrev) = true; } +// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the +// output value to multiple rows static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) { if (numOfRows <= 1) { - return ; + return; } for (int32_t k = 0; k < numOfOutput; ++k) { @@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput continue; } - int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result - char* src = pCtx[k].pOutput; + char* src = pCtx[k].pOutput; + char* dst = pCtx[k].pOutput + pCtx[k].outputBytes; - for (int32_t i = 0; i < inc; ++i) { - pCtx[k].pOutput += pCtx[k].outputBytes; - memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes); + // Let's start from the second row, as the first row has result value already. + for (int32_t i = 1; i < numOfRows; ++i) { + memcpy(dst, src, (size_t)pCtx[k].outputBytes); + dst += pCtx[k].outputBytes; + } + } +} + +static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { + for (int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex; + } + + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); + } else { + aAggs[functionId].mergeFunc(&pCtx[j]); + } + } +} + +static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) { + for(int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); + } else { + aAggs[functionId].xFinalize(&pCtx[j]); } } } @@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD SMultiwayMergeInfo* pInfo = pOperator->info; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; - char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); + char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES); for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - add[i] = pCtx[i].pInput; + addrPtr[i] = pCtx[i].pInput; pCtx[i].size = 1; } for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } else { - for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - - continue; - } - - aAggs[functionId].xFinalize(&pCtx[j]); - } + doFinalizeResultImpl(pInfo, pCtx, numOfExpr); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); @@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } } else { - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); @@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].pInput = add[i]; + pCtx[i].pInput = addrPtr[i]; } } - tfree(add); + tfree(addrPtr); } static bool isAllSourcesCompleted(SGlobalMerger *pMerger) { @@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; bool sameGroup = true; if (pInfo->hasPrev) { + + // todo refactor extract method int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); // if this row belongs to current result set group @@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { break; } + bool sameGroup = true; if (pAggInfo->hasGroupColData) { - bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); - if (!sameGroup) { + sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); + if (!sameGroup && !pAggInfo->multiGroupResults) { *newgroup = true; pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; @@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { } if (handleData) { // data in current group is all handled - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { - int32_t functionId = pAggInfo->binfo.pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - - continue; - } - - aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); - } + doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); - pAggInfo->binfo.pRes->info.rows += numOfRows; + pAggInfo->binfo.pRes->info.rows += numOfRows; setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows); } @@ -1019,71 +975,112 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { return (pRes->info.rows != 0)? pRes:NULL; } -static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { - SSLimitOperatorInfo *pInfo = pOperator->info; - assert(pInfo->currentGroupOffset >= 0); +static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { + if (pInfo->currentOffset > 0) { + pInfo->currentOffset -= 1; + } else { + // discard the data rows in current group + if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) { + int32_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock); + for (int32_t i = 0; i < num1; ++i) { + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); - SSDataBlock* pBlock = NULL; - if (pInfo->currentGroupOffset == 0) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - } + SColumnInfo *pColInfo = &pColInfoData->info; + + char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes); - if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { - while ((*newgroup) == false) { // ignore the remain blocks - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + memcpy(pDst, pSrc, pColInfo->bytes); } - } - return pBlock; + pInfo->rowsTotal += 1; + pInfo->pRes->info.rows += 1; + } } +} - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); +static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) { + if (pInfo->capacity < pResultBlock->info.rows + numOfRows) { + int32_t total = pResultBlock->info.rows + numOfRows; - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + int32_t num = taosArrayGetSize(pResultBlock->pDataBlock); + for (int32_t i = 0; i < num; ++i) { + SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i); - while(1) { - if (*newgroup) { - pInfo->currentGroupOffset -= 1; - *newgroup = false; + char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes); + if (tmp != NULL) { + pInfoData->pData = tmp; + } else { + // todo handle the malloc failure } - while ((*newgroup) == false) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + pInfo->capacity = total; + pInfo->threshold = total * 0.8; + } + } +} - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; +static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) { + int32_t rowIndex = 0; + + while (rowIndex < pBlock->info.rows) { + int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); + + bool samegroup = true; + if (pInfo->hasPrev) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); + + SColumnInfo *pColInfo = &pColInfoData->info; + + char * d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes); + if (ret != 0) { // it is a new group + samegroup = false; + break; } } - - // now we have got the first data block of the next group. - if (pInfo->currentGroupOffset == 0) { - return pBlock; - } } - return NULL; + if (!samegroup || !pInfo->hasPrev) { + pInfo->ignoreCurrentGroup = false; + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev); + + pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group + pInfo->rowsTotal = 0; + + if (pInfo->currentGroupOffset > 0) { + pInfo->ignoreCurrentGroup = true; + pInfo->currentGroupOffset -= 1; // now we are in the next group data + rowIndex += 1; + continue; + } + + // A new group has arrived according to the result rows, and the group limitation has already reached. + // Let's jump out of current loop and return immediately. + if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return; + } + + pInfo->groupTotal += 1; + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); + + } else { // handle the offset in the same group + // All the data in current group needs to be discarded, due to the limit parameter in the SQL statement + if (pInfo->ignoreCurrentGroup) { + rowIndex += 1; + continue; + } + + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); + } + + rowIndex += 1; + } } SSDataBlock* doSLimit(void* param, bool* newgroup) { @@ -1093,63 +1090,28 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { } SSLimitOperatorInfo *pInfo = pOperator->info; + pInfo->pRes->info.rows = 0; + + assert(pInfo->currentGroupOffset >= 0); + + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - SSDataBlock *pBlock = NULL; - while (1) { - pBlock = skipGroupBlock(pOperator, newgroup); if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - if (*newgroup) { // a new group arrives - pInfo->groupTotal += 1; - pInfo->rowsTotal = 0; - pInfo->currentOffset = pInfo->limit.offset; + ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows); + doSlimitImpl(pOperator, pInfo, pBlock); + if (pOperator->status == OP_EXEC_DONE) { + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - assert(pInfo->currentGroupOffset == 0); - - if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { - if (pInfo->currentOffset == 0) { - break; - } - - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; - - // move the remain rows of this data block to the front. - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); - } - - pInfo->currentOffset = 0; - break; + // now the number of rows in current group is enough, let's return to the invoke function + if (pInfo->pRes->info.rows > pInfo->threshold) { + return pInfo->pRes; } } - - if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort - return NULL; - } - - if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { - pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); - pInfo->rowsTotal = pInfo->limit.limit; - - if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) { - pOperator->status = OP_EXEC_DONE; - } - - // setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - } else { - pInfo->rowsTotal += pBlock->info.rows; - } - - return pBlock; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 612a3d4798..ad5e78dba6 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -7172,7 +7172,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo const char* msg1 = "interval not allowed in group by normal column"; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* tagSchema = NULL; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 6e8eec2456..611c235c86 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -479,6 +479,11 @@ typedef struct SSLimitOperatorInfo { char **prevRow; SArray *orderColumnList; + bool hasPrev; + bool ignoreCurrentGroup; + SSDataBlock *pRes; // result buffer + int64_t capacity; + int64_t threshold; } SSLimitOperatorInfo; typedef struct SFilterOperatorInfo { @@ -490,7 +495,7 @@ typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; SSDataBlock *pRes; int64_t totalInputRows; - + void **p; SSDataBlock *existNewGroupBlock; } SFillOperatorInfo; @@ -553,9 +558,9 @@ typedef struct SMultiwayMergeInfo { bool hasDataBlockForNewGroup; SSDataBlock *pExistBlock; - bool hasPrev; - bool groupMix; SArray *udfInfo; + bool hasPrev; + bool multiGroupResults; } SMultiwayMergeInfo; // todo support the disk-based sort @@ -586,8 +591,8 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t numOfRows, void* merger, bool groupMix); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo); + int32_t numOfRows, void* merger); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0e2aba1d24..651cb8f07d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -38,15 +38,12 @@ #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) - #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define MULTI_KEY_DELIM "-" -#define HASH_CAPACITY_LIMIT 10000000 - #define TIME_WINDOW_COPY(_dst, _src) do {\ (_dst).skey = (_src).skey;\ (_dst).ekey = (_src).ekey;\ @@ -2255,25 +2252,21 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_MultiwayMergeSort: { - bool groupMix = true; - if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { - groupMix = false; - } - - pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, - 4096, merger, groupMix); // TODO hack it + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); break; } - case OP_GlobalAggregate: { + case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. + bool groupResultMixedUp = (pQueryAttr->fillType == TSDB_FILL_NONE); pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo); + pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, groupResultMixedUp); break; } case OP_SLimit: { - pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger); + int32_t num = pRuntimeEnv->proot->numOfOutput; + SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; + pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger); break; } @@ -3623,7 +3616,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; - // re-estabilish output buffer pointer. + // set the correct pointer after the memory buffer reallocated. int32_t functionId = pBInfo->pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; @@ -4158,6 +4151,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti // refactor : extract method SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); + //add condition (pBlock->info.rows >= 1) just to runtime happy if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) { STimeWindow* w = &pBlock->info.window; @@ -4272,15 +4266,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } -int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) { - void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); +int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); - p[i] = pColInfoData->pData; + p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows); } - pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity); - tfree(p); + int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows); + pOutput->info.rows += numOfRows; + return pOutput->info.rows; } @@ -5324,11 +5318,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); + pInfo->pRes = destroyOutputBuf(pInfo->pRes); tfree(pInfo->prevRow); } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, - SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) { + SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); pInfo->resultRowFactor = @@ -5336,15 +5331,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx - pInfo->pMerge = param; - pInfo->bufCapacity = 4096; - pInfo->udfInfo = pUdfInfo; - - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - - pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->multiGroupResults = groupResultMixedUp; + pInfo->pMerge = param; + pInfo->bufCapacity = 4096; + pInfo->udfInfo = pUdfInfo; + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); // TODO refactor int32_t len = 0; @@ -5397,17 +5391,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, } SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, - int32_t numOfRows, void *merger, bool groupMix) { + int32_t numOfRows, void *merger) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - pInfo->pMerge = merger; - pInfo->groupMix = groupMix; - pInfo->bufCapacity = numOfRows; - + pInfo->pMerge = merger; + pInfo->bufCapacity = numOfRows; pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - { + { // todo extract method to create prev compare buffer int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { len += pExpr[i].base.colBytes; @@ -5415,8 +5407,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfCols; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -5432,7 +5424,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->numOfOutput = numOfOutput; + pOperator->pExpr = pExpr; pOperator->exec = doMultiwayMergeSort; pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; @@ -6348,19 +6341,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } -static SSDataBlock* doFill(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SFillOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - +static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); - return pInfo->pRes; + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + return; + } } // handle the cached new group data block @@ -6372,11 +6359,47 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; - return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } +} + +static SSDataBlock* doFill(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + SFillOperatorInfo *pInfo = pOperator->info; + pInfo->pRes->info.rows = 0; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + return pInfo->pRes; + } +// if (taosFillHasMoreResults(pInfo->pFillInfo)) { +// *newgroup = false; +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return pInfo->pRes; +// } +// +// // handle the cached new group data block +// if (pInfo->existNewGroupBlock) { +// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; +// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); +// +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); +// +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); +// pInfo->existNewGroupBlock = NULL; +// *newgroup = true; +// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +// } while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6404,28 +6427,60 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { pInfo->totalInputRows += pBlock->info.rows; - - int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey - : */pBlock->info.window.ekey; - - taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey); + taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); } } - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); - if (pInfo->pRes->info.rows > 0) { // current group has no more result to return - return pInfo->pRes; + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + + // current group has no more result to return + if (pInfo->pRes->info.rows > 0) { + // the result in current group not reach the threshold of output result, continue + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { + return pInfo->pRes; + } + + doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { + return pInfo->pRes; + } + +// if (taosFillHasMoreResults(pInfo->pFillInfo)) { +// *newgroup = false; +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return pInfo->pRes; +// } +// +// // handle the cached new group data block +// if (pInfo->existNewGroupBlock) { +// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; +// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); +// +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); +// +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); +// pInfo->existNewGroupBlock = NULL; +// *newgroup = true; +// +// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { +// return pInfo->pRes; +// } +// +//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +// } + } else if (pInfo->existNewGroupBlock) { // try next group pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey - :*/ pInfo->existNewGroupBlock->info.window.ekey; + int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; @@ -6433,7 +6488,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } else { return NULL; } - // return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } } @@ -6534,6 +6588,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = destroyOutputBuf(pInfo->pRes); + tfree(pInfo->p); } static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { @@ -6895,6 +6950,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + + pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES); } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6922,9 +6979,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->slimit = pQueryAttr->slimit; pInfo->limit = pQueryAttr->limit; - - pInfo->currentGroupOffset = pQueryAttr->slimit.offset; - pInfo->currentOffset = pQueryAttr->limit.offset; + pInfo->capacity = pRuntimeEnv->resultInfo.capacity; + pInfo->threshold = pInfo->capacity * 0.8; + pInfo->currentOffset = pQueryAttr->limit.offset; + pInfo->currentGroupOffset = pQueryAttr->slimit.offset; // TODO refactor int32_t len = 0; @@ -6932,10 +6990,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator len += pExpr[i].base.resBytes; } - int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; + int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfCols; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -6943,6 +7001,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator offset += pExpr[index->colIndex].base.resBytes; } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SLimitOperator"; diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 1a86bbae36..cdcc164152 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); pFillInfo->pData[i] = pColData->pData; - if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer + if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; assert (pTag->col.colId == pCol->col.colId); memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? diff --git a/tests/script/general/parser/columnValue_float.sim b/tests/script/general/parser/columnValue_float.sim index c7008d0b13..1832f7f847 100644 --- a/tests/script/general/parser/columnValue_float.sim +++ b/tests/script/general/parser/columnValue_float.sim @@ -150,13 +150,13 @@ if $data00 != 0.00150 then print expect 0.00150, actual: $data00 return -1 endi -sql create table st_float_15_0 using mt_float tags (3.40282347e+38) -sql select tagname from st_float_15_0 +#sql create table st_float_15_0 using mt_float tags (3.40282347e+38) +#sql select tagname from st_float_15_0 #if $data00 != 0.001500 then # return -1 #endi -sql create table st_float_16_0 using mt_float tags (-3.40282347e+38) -sql select tagname from st_float_16_0 +#sql create table st_float_16_0 using mt_float tags (-3.40282347e+38) +#sql select tagname from st_float_16_0 #if $data00 != 0.001500 then # return -1 #endi From c2fa9ce29c37c5099abfca7c5f52b3de2727d6cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Aug 2021 17:20:00 +0800 Subject: [PATCH 02/10] [td-6260]fix the bug found by regression test. --- src/client/src/tscGlobalmerge.c | 38 ++++++++++++++++++++++++++++----- src/client/src/tscSQLParser.c | 15 ++++++++++--- src/client/src/tscUtil.c | 5 ++++- src/query/inc/qExecutor.h | 28 +++++++++++++----------- src/query/inc/qTableMeta.h | 1 + src/query/src/qExecutor.c | 34 +++++++++++++++++------------ 6 files changed, 86 insertions(+), 35 deletions(-) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index ed99fcbabf..3669d84459 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -1021,7 +1021,13 @@ static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlo } } -static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) { +enum { + BLOCK_NEW_GROUP = 1, + BLOCK_NO_GROUP = 2, + BLOCK_SAME_GROUP = 3, +}; + +static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) { int32_t rowIndex = 0; while (rowIndex < pBlock->info.rows) { @@ -1030,12 +1036,12 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S bool samegroup = true; if (pInfo->hasPrev) { for (int32_t i = 0; i < numOfCols; ++i) { - SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i); SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); SColumnInfo *pColInfo = &pColInfoData->info; - char * d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes); if (ret != 0) { // it is a new group samegroup = false; @@ -1063,10 +1069,17 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; - return; + return BLOCK_NO_GROUP; } pInfo->groupTotal += 1; + + // data in current group not allowed, return if current result does not belong to the previous group.And there + // are results exists in current SSDataBlock + if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) { + return BLOCK_NEW_GROUP; + } + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); } else { // handle the offset in the same group @@ -1081,6 +1094,8 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S rowIndex += 1; } + + return BLOCK_SAME_GROUP; } SSDataBlock* doSLimit(void* param, bool* newgroup) { @@ -1092,6 +1107,14 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { SSLimitOperatorInfo *pInfo = pOperator->info; pInfo->pRes->info.rows = 0; + if (pInfo->pPrevBlock != NULL) { + ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows); + int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock); + assert(ret != BLOCK_NEW_GROUP); + + pInfo->pPrevBlock = NULL; + } + assert(pInfo->currentGroupOffset >= 0); while(1) { @@ -1104,7 +1127,12 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { } ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows); - doSlimitImpl(pOperator, pInfo, pBlock); + int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock); + if (ret == BLOCK_NEW_GROUP) { + pInfo->pPrevBlock = pBlock; + return pInfo->pRes; + } + if (pOperator->status == OP_EXEC_DONE) { return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ad5e78dba6..6e9506c576 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -925,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pQueryInfo = pCmd->active; pQueryInfo->pUdfInfo = pUdfInfo; pQueryInfo->udfCopy = true; - } } @@ -8696,6 +8695,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS if (taosArrayGetSize(subInfo->pSubquery) >= 2) { return invalidOperationMsg(msgBuf, "not support union in subquery"); } + SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); tscInitQueryInfo(pSub); @@ -8713,12 +8713,12 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS return code; } - // create dummy table meta info STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo)); if (pTableMetaInfo1 == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub); pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta); @@ -8802,7 +8802,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf // check if there is 3 level select SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i); SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); - if (p->from->type == SQL_NODE_FROM_SUBQUERY){ + if (p->from->type == SQL_NODE_FROM_SUBQUERY) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); } @@ -8895,6 +8895,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } } + // disable group result mixed up if interval/session window query exists. + if (isTimeWindowQuery(pQueryInfo)) { + size_t num = taosArrayGetSize(pQueryInfo->pUpstream); + for(int32_t i = 0; i < num; ++i) { + SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i); + pUp->multigroupResult = false; + } + } + // parse the having clause in the first place int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 591a6bba34..3f737d1589 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3128,6 +3128,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->slimit.offset = 0; pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->window = TSWINDOW_INITIALIZER; + pQueryInfo->multigroupResult = true; } int32_t tscAddQueryInfo(SSqlCmd* pCmd) { @@ -3139,7 +3140,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) { } tscInitQueryInfo(pQueryInfo); - pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer if (pCmd->pQueryInfo == NULL) { @@ -3222,6 +3222,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { pQueryInfo->window = pSrc->window; pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->pTableMetaInfo = NULL; + pQueryInfo->multigroupResult = pSrc->multigroupResult; pQueryInfo->bufLen = pSrc->bufLen; pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery; @@ -3623,6 +3624,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->bufLen = pQueryInfo->bufLen; pNewQueryInfo->distinct = pQueryInfo->distinct; + pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult; pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); if (pNewQueryInfo->buf == NULL) { @@ -4736,6 +4738,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->sw = pQueryInfo->sessionWindow; pQueryAttr->stateWindow = pQueryInfo->stateWindow; + pQueryAttr->multigroupResult = pQueryInfo->multigroupResult; pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfOutput = numOfOutput; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 611c235c86..f07aac4b93 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -223,6 +223,7 @@ typedef struct SQueryAttr { bool distinct; // distinct query or not bool stateWindow; // window State on sub/normal table bool createFilterOperator; // if filter operator is needed + bool multigroupResult; // multigroup result can exist in one SSDataBlock int32_t interBufSize; // intermediate buffer sizse int32_t havingNum; // having expr number @@ -469,19 +470,21 @@ typedef struct SLimitOperatorInfo { } SLimitOperatorInfo; typedef struct SSLimitOperatorInfo { - int64_t groupTotal; - int64_t currentGroupOffset; + int64_t groupTotal; + int64_t currentGroupOffset; - int64_t rowsTotal; - int64_t currentOffset; - SLimitVal limit; - SLimitVal slimit; + int64_t rowsTotal; + int64_t currentOffset; + SLimitVal limit; + SLimitVal slimit; - char **prevRow; - SArray *orderColumnList; - bool hasPrev; - bool ignoreCurrentGroup; + char **prevRow; + SArray *orderColumnList; + bool hasPrev; + bool ignoreCurrentGroup; + bool multigroupResult; SSDataBlock *pRes; // result buffer + SSDataBlock *pPrevBlock; int64_t capacity; int64_t threshold; } SSLimitOperatorInfo; @@ -497,6 +500,7 @@ typedef struct SFillOperatorInfo { int64_t totalInputRows; void **p; SSDataBlock *existNewGroupBlock; + bool multigroupResult; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { @@ -582,7 +586,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -594,7 +598,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx int32_t numOfRows, void* merger); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index d6b04b0330..746c5f8569 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -165,6 +165,7 @@ typedef struct SQueryInfo { bool orderProjectQuery; bool stateWindow; bool globalMerge; + bool multigroupResult; } SQueryInfo; /** diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 651cb8f07d..2ec87075c6 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2247,7 +2247,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Fill: { SOperatorInfo* pInfo = pRuntimeEnv->proot; - pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); + pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); break; } @@ -2257,16 +2257,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. - bool groupResultMixedUp = (pQueryAttr->fillType == TSDB_FILL_NONE); + bool multigroupResult = pQueryAttr->multigroupResult; + if (pQueryAttr->multigroupResult) { + multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE); + } + pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, groupResultMixedUp); + pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); break; } case OP_SLimit: { int32_t num = pRuntimeEnv->proot->numOfOutput; SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; - pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger); + pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); break; } @@ -6345,7 +6349,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) { return; } } @@ -6377,7 +6381,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { return pInfo->pRes; } // if (taosFillHasMoreResults(pInfo->pFillInfo)) { @@ -6414,8 +6418,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { pInfo->existNewGroupBlock = pBlock; *newgroup = false; - // fill the previous group data block - // before handle a new data block, close the fill operation for previous group data block + // Fill the previous group data block, before handle the data block of new group. + // Close the fill operation for previous group data block taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { if (pBlock == NULL) { @@ -6436,8 +6440,9 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { // current group has no more result to return if (pInfo->pRes->info.rows > 0) { - // the result in current group not reach the threshold of output result, continue - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { + // 1. The result in current group not reach the threshold of output result, continue + // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) { return pInfo->pRes; } @@ -6932,10 +6937,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato return pOperator; } -SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, - int32_t numOfOutput) { +SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + pInfo->multigroupResult = multigroupResult; { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6971,7 +6976,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) { +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6982,7 +6987,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->capacity = pRuntimeEnv->resultInfo.capacity; pInfo->threshold = pInfo->capacity * 0.8; pInfo->currentOffset = pQueryAttr->limit.offset; - pInfo->currentGroupOffset = pQueryAttr->slimit.offset; + pInfo->currentGroupOffset = pQueryAttr->slimit.offset; + pInfo->multigroupResult= multigroupResult; // TODO refactor int32_t len = 0; From 430ab357bfc6e64a949144fa7bccf15fb4c3f1e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Aug 2021 19:14:47 +0800 Subject: [PATCH 03/10] [td-6260]fix compiler error. --- src/query/src/qExecutor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2ec87075c6..d4341956b0 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6985,7 +6985,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->slimit = pQueryAttr->slimit; pInfo->limit = pQueryAttr->limit; pInfo->capacity = pRuntimeEnv->resultInfo.capacity; - pInfo->threshold = pInfo->capacity * 0.8; + pInfo->threshold = (int64_t)(pInfo->capacity * 0.8); pInfo->currentOffset = pQueryAttr->limit.offset; pInfo->currentGroupOffset = pQueryAttr->slimit.offset; pInfo->multigroupResult= multigroupResult; From 9183a6578b50b121218f302573a61d89395d19e7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Aug 2021 13:02:21 +0800 Subject: [PATCH 04/10] [td-6260]fix compiler error. --- src/client/src/tscGlobalmerge.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 3669d84459..71be8f8cbc 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -981,7 +981,7 @@ static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* } else { // discard the data rows in current group if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) { - int32_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock); + size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock); for (int32_t i = 0; i < num1; ++i) { SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); From a3a274d0c18f4afc66bcee27666fa447304ab725 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Aug 2021 14:01:02 +0800 Subject: [PATCH 05/10] [td-255] refactor --- src/client/src/tscServer.c | 48 ++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3c5d1a5786..19efa229af 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -331,12 +331,37 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = NULL, .code = 0 }; - rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); return TSDB_CODE_SUCCESS; } +// handle three situation +// 1. epset retry, only return last failure ep +// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn +// 3. other situation, no expected +void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) { + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + char* msgBuf = tscGetErrorMsgPayload(pCmd); + + if (pEpSet) { + sprintf(msgBuf, "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]); + } else if (pCmd->command >= TSDB_SQL_MGMT) { + SRpcEpSet tEpset; + + SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; + taosCorBeginRead(&pCorEpSet->version); + tEpset = pCorEpSet->epSet; + taosCorEndRead(&pCorEpSet->version); + + sprintf(msgBuf, "%s\"%s\"", tstrerror(pRes->code),tEpset.fqdn[(tEpset.inUse)%(tEpset.numOfEps)]); + } else { + sprintf(msgBuf, "%s", tstrerror(pRes->code)); + } +} + void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); @@ -499,26 +524,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; - if (pRes->code == TSDB_CODE_RPC_FQDN_ERROR) { + if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) { tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64); - // handle three situation - // 1. epset retry, only return last failure ep - // 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn - // 3. other situation, no expected - if (pEpSet) { - sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),pEpSet->fqdn[(pEpSet->inUse)%(pEpSet->numOfEps)]); - } else if (pCmd->command >= TSDB_SQL_MGMT) { - SRpcEpSet tEpset; - - SRpcCorEpSet *pCorEpSet = pSql->pTscObj->tscCorMgmtEpSet; - taosCorBeginRead(&pCorEpSet->version); - tEpset = pCorEpSet->epSet; - taosCorEndRead(&pCorEpSet->version); - - sprintf(tscGetErrorMsgPayload(pCmd), "%s\"%s\"", tstrerror(pRes->code),tEpset.fqdn[(tEpset.inUse)%(tEpset.numOfEps)]); - } else { - sprintf(tscGetErrorMsgPayload(pCmd), "%s", tstrerror(pRes->code)); - } + tscSetFqdnErrorMsg(pSql, pEpSet); } (*pSql->fp)(pSql->param, pSql, rpcMsg->code); From 787076327737e5790b87ae024be6dee67860b49f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Aug 2021 14:01:21 +0800 Subject: [PATCH 06/10] [td-255] merge develop --- src/connector/python/taos/bind.py | 49 ++++++++++++---------- src/connector/python/taos/cinterface.py | 2 +- src/connector/python/taos/constants.py | 7 +++- src/connector/python/tests/test-td6231.py | 50 +++++++++++++++++++++++ 4 files changed, 83 insertions(+), 25 deletions(-) create mode 100644 src/connector/python/tests/test-td6231.py diff --git a/src/connector/python/taos/bind.py b/src/connector/python/taos/bind.py index ede6381628..083ddc99ae 100644 --- a/src/connector/python/taos/bind.py +++ b/src/connector/python/taos/bind.py @@ -10,7 +10,8 @@ import sys _datetime_epoch = datetime.utcfromtimestamp(0) def _is_not_none(obj): - obj != None + return obj != None + class TaosBind(ctypes.Structure): _fields_ = [ ("buffer_type", c_int), @@ -299,27 +300,14 @@ class TaosMultiBind(ctypes.Structure): self.buffer = cast(buffer, c_void_p) self.num = len(values) - def binary(self, values): + def _str_to_buffer(self, values): self.num = len(values) - self.buffer = cast(c_char_p("".join(filter(_is_not_none, values)).encode("utf-8")), c_void_p) - self.length = (c_int * len(values))(*[len(value) if value is not None else 0 for value in values]) - self.buffer_type = FieldType.C_BINARY - self.is_null = cast((c_byte * self.num)(*[1 if v == None else 0 for v in values]), c_char_p) - - def timestamp(self, values, precision=PrecisionEnum.Milliseconds): - try: - buffer = cast(values, c_void_p) - except: - buffer_type = c_int64 * len(values) - buffer = buffer_type(*[_datetime_to_timestamp(value, precision) for value in values]) - - self.buffer_type = FieldType.C_TIMESTAMP - self.buffer = cast(buffer, c_void_p) - self.buffer_length = sizeof(c_int64) - self.num = len(values) - - def nchar(self, values): - # type: (list[str]) -> None + is_null = [1 if v == None else 0 for v in values] + self.is_null = cast((c_byte * self.num)(*is_null), c_char_p) + + if sum(is_null) == self.num: + self.length = (c_int32 * len(values))(0 * self.num) + return if sys.version_info < (3, 0): _bytes = [bytes(value) if value is not None else None for value in values] buffer_length = max(len(b) + 1 for b in _bytes if b is not None) @@ -347,9 +335,26 @@ class TaosMultiBind(ctypes.Structure): ) self.length = (c_int32 * len(values))(*[len(b) if b is not None else 0 for b in _bytes]) self.buffer_length = buffer_length + def binary(self, values): + self.buffer_type = FieldType.C_BINARY + self._str_to_buffer(values) + + def timestamp(self, values, precision=PrecisionEnum.Milliseconds): + try: + buffer = cast(values, c_void_p) + except: + buffer_type = c_int64 * len(values) + buffer = buffer_type(*[_datetime_to_timestamp(value, precision) for value in values]) + + self.buffer_type = FieldType.C_TIMESTAMP + self.buffer = cast(buffer, c_void_p) + self.buffer_length = sizeof(c_int64) self.num = len(values) - self.is_null = cast((c_byte * self.num)(*[1 if v == None else 0 for v in values]), c_char_p) + + def nchar(self, values): + # type: (list[str]) -> None self.buffer_type = FieldType.C_NCHAR + self._str_to_buffer(values) def tinyint_unsigned(self, values): self.buffer_type = FieldType.C_TINYINT_UNSIGNED diff --git a/src/connector/python/taos/cinterface.py b/src/connector/python/taos/cinterface.py index 51e9a8667d..42dac3c2e8 100644 --- a/src/connector/python/taos/cinterface.py +++ b/src/connector/python/taos/cinterface.py @@ -49,7 +49,7 @@ def _load_taos(): try: return load_func[platform.system()]() except: - sys.exit("unsupported platform to TDengine connector") + raise InterfaceError('unsupported platform or failed to load taos client library') _libtaos = _load_taos() diff --git a/src/connector/python/taos/constants.py b/src/connector/python/taos/constants.py index b500df627c..8ad5b69fc0 100644 --- a/src/connector/python/taos/constants.py +++ b/src/connector/python/taos/constants.py @@ -3,6 +3,9 @@ """Constants in TDengine python """ +import ctypes, struct + + class FieldType(object): """TDengine Field Types""" @@ -33,8 +36,8 @@ class FieldType(object): C_INT_UNSIGNED_NULL = 4294967295 C_BIGINT_NULL = -9223372036854775808 C_BIGINT_UNSIGNED_NULL = 18446744073709551615 - C_FLOAT_NULL = float("nan") - C_DOUBLE_NULL = float("nan") + C_FLOAT_NULL = ctypes.c_float(struct.unpack(" Date: Wed, 25 Aug 2021 14:25:01 +0800 Subject: [PATCH 07/10] [td-255] fix compiler error. --- src/client/src/tscGlobalmerge.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index bacd938b85..d192f743e1 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -1016,7 +1016,7 @@ static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlo } pInfo->capacity = total; - pInfo->threshold = total * 0.8; + pInfo->threshold = (int64_t)(total * 0.8); } } } From d2c2010053d6b0c0ea84d6a1a0b0b0a5ba2d2f64 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Aug 2021 16:57:06 +0800 Subject: [PATCH 08/10] [td-255] fix compiler error. --- src/client/src/tscGlobalmerge.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index d192f743e1..cfcc5d8b0a 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -1004,7 +1004,7 @@ static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlo if (pInfo->capacity < pResultBlock->info.rows + numOfRows) { int32_t total = pResultBlock->info.rows + numOfRows; - int32_t num = taosArrayGetSize(pResultBlock->pDataBlock); + size_t num = taosArrayGetSize(pResultBlock->pDataBlock); for (int32_t i = 0; i < num; ++i) { SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i); From b822d971ccb7ea8adbbc134f0c40fe90de9357c5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Aug 2021 20:24:43 +0800 Subject: [PATCH 09/10] [td-255] fix invalid free error. --- src/client/src/tscServer.c | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 19efa229af..d36a3124db 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -367,8 +367,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } @@ -386,8 +384,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } @@ -399,8 +395,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } @@ -454,8 +448,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } } @@ -539,8 +531,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); } int doBuildAndSendMsg(SSqlObj *pSql) { From 0a3b524229dfd5e72a79cce1c11773483ca9f629 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 Aug 2021 13:19:11 +0800 Subject: [PATCH 10/10] [td-255] fix a memory leak. --- src/client/src/tscUtil.c | 33 ++++++++++++++++----------------- src/util/src/tutil.c | 2 ++ 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7b199aa337..586b929c92 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3241,6 +3241,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { taosArrayDestroy(pQueryInfo->pUpstream); pQueryInfo->pUpstream = NULL; + pQueryInfo->bufLen = 0; } void tscClearSubqueryInfo(SSqlCmd* pCmd) { @@ -3661,27 +3662,25 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pnCmd->active = pNewQueryInfo; memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval)); - pNewQueryInfo->type = pQueryInfo->type; - pNewQueryInfo->window = pQueryInfo->window; - pNewQueryInfo->limit = pQueryInfo->limit; - pNewQueryInfo->slimit = pQueryInfo->slimit; - pNewQueryInfo->order = pQueryInfo->order; - pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit; - pNewQueryInfo->tsBuf = NULL; - pNewQueryInfo->fillType = pQueryInfo->fillType; - pNewQueryInfo->fillVal = NULL; + pNewQueryInfo->type = pQueryInfo->type; + pNewQueryInfo->window = pQueryInfo->window; + pNewQueryInfo->limit = pQueryInfo->limit; + pNewQueryInfo->slimit = pQueryInfo->slimit; + pNewQueryInfo->order = pQueryInfo->order; + pNewQueryInfo->tsBuf = NULL; + pNewQueryInfo->fillType = pQueryInfo->fillType; + pNewQueryInfo->fillVal = NULL; + pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; + pNewQueryInfo->prjOffset = pQueryInfo->prjOffset; pNewQueryInfo->numOfFillVal = 0; - pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; - pNewQueryInfo->prjOffset = pQueryInfo->prjOffset; - pNewQueryInfo->numOfTables = 0; + pNewQueryInfo->numOfTables = 0; pNewQueryInfo->pTableMetaInfo = NULL; - pNewQueryInfo->bufLen = pQueryInfo->bufLen; - pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); - - pNewQueryInfo->distinct = pQueryInfo->distinct; + pNewQueryInfo->bufLen = pQueryInfo->bufLen; + pNewQueryInfo->vgroupLimit = pQueryInfo->vgroupLimit; + pNewQueryInfo->distinct = pQueryInfo->distinct; pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult; - pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); + pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); if (pNewQueryInfo->buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 7bcdb69cf2..02ea7091a6 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -165,6 +165,8 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) { return NULL; } + + char* strtolower(char *dst, const char *src) { int esc = 0; char quote = 0, *p = dst, c;