From 9863879951d28bab71553f07bb9b425e01c52095 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 09:21:43 +0800 Subject: [PATCH 01/14] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 77 ++++++++----------------- 2 files changed, 26 insertions(+), 52 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cd9f29978d..94361554d0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -265,6 +265,7 @@ typedef struct SExchangeInfo { SLoadRemoteDataInfo loadInfo; uint64_t self; SLimitInfo limitInfo; + int64_t openedTs; // start exec time stamp } SExchangeInfo; typedef struct SScanInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b7c3eed069..00733fa21f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1847,40 +1847,41 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { return NULL; } -static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, - SExecTaskInfo* pTaskInfo) { - int32_t code = 0; - int64_t startTs = taosGetTimestampUs(); - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + +static int32_t getCompletedSources(const SArray* pArray) { + size_t total = taosArrayGetSize(pArray); int32_t completed = 0; - for (int32_t k = 0; k < totalSources; ++k) { - SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + for (int32_t k = 0; k < total; ++k) { + SSourceDataInfo* p = taosArrayGet(pArray, k); if (p->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; } } + return completed; +} + +static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, + SExecTaskInfo* pTaskInfo) { + int32_t code = 0; + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo); + int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo); if (completed == totalSources) { - setAllSourcesCompleted(pOperator, startTs); + setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs); return; } while (1) { -// printf("1\n"); tsem_wait(&pExchangeInfo->ready); -// printf("2\n"); for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { -// printf("========:%d is completed\n", i); continue; } -// printf("index:%d - status:%d\n", i, pDataInfo->status); if (pDataInfo->status != EX_SOURCE_DATA_READY) { -// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status); continue; } @@ -1896,27 +1897,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; -// printf("%d completed, try next\n", i); - qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources); + pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); - -// if (completed == totalSources) { -// return; -// } else { -// break; -// } - break; + break; } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; int32_t index = 0; char* pStart = pRetrieveRsp->data; while (index++ < pRetrieveRsp->numOfBlocks) { - printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks); SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); if (code != 0) { @@ -1927,25 +1919,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn taosArrayPush(pExchangeInfo->pResultBlockList, &pb); } - updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); + updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); -// int32_t completed = 0; if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - -// for (int32_t k = 0; k < totalSources; ++k) { -// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); -// if (p->status == EX_SOURCE_DATA_EXHAUSTED) { -// completed += 1; -// } -// } - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb, completed:%d try next %d/%" PRIzu, + ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, - completed, i + 1, totalSources); + i + 1, totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", @@ -1963,23 +1946,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn goto _error; } } - -// if (completed == totalSources) { -// setAllSourcesCompleted(pOperator, startTs); -// } - return; - } + } // end loop - int32_t completed = 0; - for (int32_t k = 0; k < totalSources; ++k) { - SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); - if (p->status == EX_SOURCE_DATA_EXHAUSTED) { - completed += 1; - } - } - - if (completed == totalSources) { + int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo); + if (complete1 == totalSources) { + qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo)); return; } } @@ -2098,6 +2070,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { return code; } + pExchangeInfo->openedTs = taosGetTimestampUs(); } OPTR_SET_OPENED(pOperator); From 2621dec4aeefe14ee647cfc714a4e82c7b174b6b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 12:02:44 +0800 Subject: [PATCH 02/14] enh(query): improve the perf. --- source/libs/function/src/builtinsimpl.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1501bb6d67..d24393a766 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1555,6 +1555,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value +#if 0 if ((*val) == pData[i]) { continue; } @@ -1565,6 +1566,23 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } +#endif + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } } numOfElems += 1; From 032208e36ab272c425a6a74eed0ca42d52ba7f78 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 14:01:29 +0800 Subject: [PATCH 03/14] enh(query): optimize the query perf. --- source/libs/function/src/builtinsimpl.c | 199 +++++++++++++++--------- 1 file changed, 126 insertions(+), 73 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d24393a766..ab972bdab0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1273,14 +1273,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1304,14 +1310,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1335,14 +1347,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1366,14 +1384,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1399,14 +1423,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1430,14 +1460,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1461,14 +1497,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1492,14 +1534,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1524,14 +1572,20 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->assign = true; } else { // ignore the equivalent data value - if ((*val) == pData[i]) { - continue; - } - - if ((*val < pData[i]) ^ isMinFunc) { - *val = pData[i]; - if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + // NOTE: An faster version to avoid one additional comparison with FPU. + if (isMinFunc) { // min + if (*val < pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } + } + } else { // max + if (*val > pData[i]) { + *val = pData[i]; + if (pCtx->subsidiaries.num > 0) { + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + } } } } @@ -1554,7 +1608,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } pBuf->assign = true; } else { - // ignore the equivalent data value #if 0 if ((*val) == pData[i]) { continue; From b0a54d3fab495963f8c253b5332b9e17b5ad77eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 14:54:42 +0800 Subject: [PATCH 04/14] enh(query): improve the perf. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 ++ source/libs/function/src/builtinsimpl.c | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bfde5b3076..41d50f119d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -985,6 +985,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes); + ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // make sure it is aligned to 8bit + // null value exists, check one-by-one if (pData->flag != HAS_VALUE) { for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index ab972bdab0..00a148ea25 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -906,6 +906,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_FLOAT: { float* plist = (float*)pCol->pData; + float val = 0; for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; @@ -913,8 +914,9 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pAvgRes->count += 1; - pAvgRes->sum.dsum += plist[i]; + val += plist[i]; } + pAvgRes->sum.dsum = val; break; } From f85d66f5eee9846157a5d100c96e96fb7319f6ee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 16:59:00 +0800 Subject: [PATCH 05/14] enh(query): opt query perf. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 3 ++- source/libs/function/src/builtinsimpl.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 41d50f119d..a20742c55f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -985,7 +985,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes); - ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // make sure it is aligned to 8bit + // make sure it is aligned to 8bit + ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // null value exists, check one-by-one if (pData->flag != HAS_VALUE) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 00a148ea25..541e2e1954 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3096,7 +3096,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pInputCol, i); - TSKEY cts = getRowPTs(pInput->pPTS, i); + TSKEY cts = *(TSKEY*) colDataGetData(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { doSaveCurrentVal(pCtx, i, cts, type, data); pResInfo->numOfRes = 1; From 6e1a2b1e54ddca0372e1995a896e6d9cb4d9ff1d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 17:38:09 +0800 Subject: [PATCH 06/14] enh(query): improve the perf. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 14 +++++++++----- source/libs/function/src/builtinsimpl.c | 3 ++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a20742c55f..77e6dc5649 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -957,11 +957,15 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t i = 0; SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - if (asc) { - memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t)); - } else { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) { - colDataAppendInt64(pColData, rowIndex++, &pBlockData->aTSKEY[j]); + memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t)); + + if (!asc) { // reverse the array list + int32_t mid = remain / 2; + TSKEY* pts = (int64_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[i]; + pts[j] = pts[remain - j - 1]; + pts[remain - j - 1] = t; } } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 541e2e1954..1f2fe5c658 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3103,8 +3103,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #endif + + // save selectivity value for column consisted of all null values if (numOfElems == 0) { - // save selectivity value for column consisted of all null values firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); } SET_VAL(pResInfo, numOfElems, 1); From cc3e97da36bbe3bea44428f24c4a0ff4320bff21 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 17:41:29 +0800 Subject: [PATCH 07/14] fix(query): fix an typo. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 77e6dc5649..f185e31de6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -959,11 +959,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t)); + + // todo: opt perf by extract the loop if (!asc) { // reverse the array list - int32_t mid = remain / 2; + int32_t mid = remain >> 1u; TSKEY* pts = (int64_t*)pColData->pData; for (int32_t j = 0; j < mid; ++j) { - int64_t t = pts[i]; + int64_t t = pts[j]; pts[j] = pts[remain - j - 1]; pts[remain - j - 1] = t; } From 663138ba1ef0959b9a77bb199070445d3c764db2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 18:33:57 +0800 Subject: [PATCH 08/14] enh(query): optimize the query perf. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 129 +++++++++++++++++++------ 1 file changed, 99 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f185e31de6..2ffbe78803 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -891,7 +891,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or } } -int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) { +static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) { // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; bool asc = ASCENDING_TRAVERSE(pReader->order); @@ -908,6 +908,26 @@ int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SData return endPos; } +static void copyPrimaryTsCol(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, + int32_t dumpedRows, bool asc) { + if (asc) { + memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t)); + } else { + int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1; + memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t)); + + // todo: opt perf by extract the loop + // reverse the array list + int32_t mid = dumpedRows >> 1u; + int64_t* pts = (int64_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + } +} + static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; @@ -947,30 +967,17 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn } endIndex += step; - int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); - if (remain > pReader->capacity) { // output buffer check - remain = pReader->capacity; + int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); + if (dumpedRows > pReader->capacity) { // output buffer check + dumpedRows = pReader->capacity; } + int32_t i = 0; int32_t rowIndex = 0; - int32_t i = 0; SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], remain * sizeof(int64_t)); - - - // todo: opt perf by extract the loop - if (!asc) { // reverse the array list - int32_t mid = remain >> 1u; - TSKEY* pts = (int64_t*)pColData->pData; - for (int32_t j = 0; j < mid; ++j) { - int64_t t = pts[j]; - pts[j] = pts[remain - j - 1]; - pts[remain - j - 1] = t; - } - } - + copyPrimaryTsCol(pBlockData, pDumpInfo, pColData, dumpedRows, asc); i += 1; } @@ -985,18 +992,80 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn colIndex += 1; } else if (pData->cid == pColData->info.colId) { if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL | HAS_NONE)) { - colDataAppendNNULL(pColData, 0, remain); + colDataAppendNNULL(pColData, 0, dumpedRows); } else { - if (IS_NUMERIC_TYPE(pColData->info.type) && asc) { - uint8_t* p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; - memcpy(pColData->pData, p, remain * tDataTypes[pData->type].bytes); + if (IS_MATHABLE_TYPE(pColData->info.type)) { + + uint8_t* p = NULL; + if (asc) { + p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; + } else { + int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1; + p = pData->pData + tDataTypes[pData->type].bytes * startIndex; + } // make sure it is aligned to 8bit ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); + memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); + + if (!asc) { + switch(pColData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + { + int32_t mid = dumpedRows >> 1u; + int64_t* pts = (int64_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: { + int32_t mid = dumpedRows >> 1u; + int8_t* pts = (int8_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: { + int32_t mid = dumpedRows >> 1u; + int16_t* pts = (int16_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: { + int32_t mid = dumpedRows >> 1u; + int32_t* pts = (int32_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + } + } // null value exists, check one-by-one if (pData->flag != HAS_VALUE) { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) { + for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) { uint8_t v = tColDataGetBitValue(pData, j); if (v == 0 || v == 1) { colDataSetNull_f(pColData->nullbitmap, rowIndex); @@ -1004,7 +1073,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn } } } else { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step) { + for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) { tColDataGetValue(pData, j, &cv); doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); } @@ -1014,7 +1083,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn colIndex += 1; i += 1; } else { // the specified column does not exist in file block, fill with null data - colDataAppendNNULL(pColData, 0, remain); + colDataAppendNNULL(pColData, 0, dumpedRows); i += 1; } } @@ -1022,12 +1091,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn // fill the mis-matched columns with null value while (i < numOfOutputCols) { pColData = taosArrayGet(pResBlock->pDataBlock, i); - colDataAppendNNULL(pColData, 0, remain); + colDataAppendNNULL(pColData, 0, dumpedRows); i += 1; } - pResBlock->info.rows = remain; - pDumpInfo->rowIndex += step * remain; + pResBlock->info.rows = dumpedRows; + pDumpInfo->rowIndex += step * dumpedRows; // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) { @@ -1046,7 +1115,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows, unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; From ecf3e2c00064e1ca7cc985d5989253d0655293b4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 18:44:28 +0800 Subject: [PATCH 09/14] enh(query): optimize the perf. --- source/libs/function/src/builtinsimpl.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1f2fe5c658..198892a99e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3096,9 +3096,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pInputCol, i); - TSKEY cts = *(TSKEY*) colDataGetData(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveCurrentVal(pCtx, i, cts, type, data); + TSKEY* cts = (TSKEY*) colDataGetData(pInput->pPTS, i); + if (pResInfo->numOfRes == 0 || pInfo->ts < (*cts)) { + doSaveCurrentVal(pCtx, i, *cts, type, data); pResInfo->numOfRes = 1; } } From dc2c419f2212c36b26a183fc7187fc6d10d46ef1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 19:01:35 +0800 Subject: [PATCH 10/14] enh(query): optimize the query perf. --- source/libs/function/src/builtinsimpl.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 198892a99e..5f3d9c138e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3088,6 +3088,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else + int64_t* pts = (int64_t*)pInput->pPTS->pData; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { continue; @@ -3096,9 +3097,9 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pInputCol, i); - TSKEY* cts = (TSKEY*) colDataGetData(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || pInfo->ts < (*cts)) { - doSaveCurrentVal(pCtx, i, *cts, type, data); + TSKEY cts = pts[i]; + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + doSaveCurrentVal(pCtx, i, cts, type, data); pResInfo->numOfRes = 1; } } @@ -3285,11 +3286,13 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { } } #else + + int64_t* pts = (int64_t*)pInput->pPTS->pData; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { char* data = colDataGetData(pInputCol, i); - TSKEY cts = getRowPTs(pInput->pPTS, i); - numOfElems++; + TSKEY cts = pts[i]; + numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { doSaveLastrow(pCtx, data, i, cts, pInfo); pResInfo->numOfRes = 1; From 68f2f9211691be18871802d48233115a730edec1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Nov 2022 09:07:21 +0800 Subject: [PATCH 11/14] fix(query): fix the syntax errors. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/src/executorimpl.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4914deed30..0f970b6830 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1117,7 +1117,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows, unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 52c038c500..7d662f8784 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1897,7 +1897,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->numOfRows == 0) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); From e3aabacf9a8a6f7e3da1cb65e7454cc3622b3ad4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Nov 2022 09:47:21 +0800 Subject: [PATCH 12/14] fix(query): fix error in min/max functions. --- source/libs/function/src/builtinsimpl.c | 46 ++++++++++++------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 58e41d5abb..e3a76ad33a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -911,7 +911,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_FLOAT: { float* plist = (float*)pCol->pData; - float val = 0; +// float val = 0; for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; @@ -919,9 +919,9 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pAvgRes->count += 1; - val += plist[i]; + pAvgRes->sum.dsum += plist[i]; } - pAvgRes->sum.dsum = val; +// pAvgRes->sum.dsum = val; break; } @@ -1282,14 +1282,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1319,14 +1319,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1356,14 +1356,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1393,14 +1393,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1432,14 +1432,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1469,14 +1469,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1506,14 +1506,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1543,14 +1543,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1581,14 +1581,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { // ignore the equivalent data value // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1629,14 +1629,14 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { #endif // NOTE: An faster version to avoid one additional comparison with FPU. if (isMinFunc) { // min - if (*val < pData[i]) { + if (*val > pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } else { // max - if (*val > pData[i]) { + if (*val < pData[i]) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); From 0c427b5f5b63408f4d764b096b46ebe9b8882e76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Nov 2022 10:39:32 +0800 Subject: [PATCH 13/14] refactor: do some internal refactor. --- include/common/tdataformat.h | 2 +- source/common/src/tdataformat.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 173 +++++++++++++----------- source/libs/function/src/builtinsimpl.c | 1 + 4 files changed, 97 insertions(+), 81 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 71e260c001..2eda2f66cc 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -130,7 +130,7 @@ void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) void tColDataClear(SColData *pColData); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); -uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal); +uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 8c003066dc..73b887342c 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1557,7 +1557,7 @@ void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal); } -uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal) { +uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal) { uint8_t v; switch (pColData->flag) { case HAS_NONE: diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0f970b6830..9dab1e1d33 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -910,7 +910,7 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData return endPos; } -static void copyPrimaryTsCol(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, +static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, int32_t dumpedRows, bool asc) { if (asc) { memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t)); @@ -930,6 +930,97 @@ static void copyPrimaryTsCol(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpIn } } +// a faster version of copy procedure. +static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, + int32_t dumpedRows, bool asc) { + uint8_t* p = NULL; + if (asc) { + p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; + } else { + int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1; + p = pData->pData + tDataTypes[pData->type].bytes * startIndex; + } + + int32_t step = asc? 1:-1; + + // make sure it is aligned to 8bit + ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); + + // 1. copy data in a batch model + memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); + + // 2. reverse the array list in case of descending order scan data block + if (!asc) { + switch(pColData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + { + int32_t mid = dumpedRows >> 1u; + int64_t* pts = (int64_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: { + int32_t mid = dumpedRows >> 1u; + int8_t* pts = (int8_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: { + int32_t mid = dumpedRows >> 1u; + int16_t* pts = (int16_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: { + int32_t mid = dumpedRows >> 1u; + int32_t* pts = (int32_t*)pColData->pData; + for (int32_t j = 0; j < mid; ++j) { + int64_t t = pts[j]; + pts[j] = pts[dumpedRows - j - 1]; + pts[dumpedRows - j - 1] = t; + } + break; + } + } + } + + // 3. if the null value exists, check items one-by-one + if (pData->flag != HAS_VALUE) { + int32_t rowIndex = 0; + + for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) { + uint8_t v = tColDataGetBitValue(pData, j); + if (v == 0 || v == 1) { + colDataSetNull_f(pColData->nullbitmap, rowIndex); + pColData->hasNull = true; + } + } + } +} + static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; @@ -997,84 +1088,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn colDataAppendNNULL(pColData, 0, dumpedRows); } else { if (IS_MATHABLE_TYPE(pColData->info.type)) { - - uint8_t* p = NULL; - if (asc) { - p = pData->pData + tDataTypes[pData->type].bytes * pDumpInfo->rowIndex; - } else { - int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1; - p = pData->pData + tDataTypes[pData->type].bytes * startIndex; - } - - // make sure it is aligned to 8bit - ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); - memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); - - if (!asc) { - switch(pColData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - { - int32_t mid = dumpedRows >> 1u; - int64_t* pts = (int64_t*)pColData->pData; - for (int32_t j = 0; j < mid; ++j) { - int64_t t = pts[j]; - pts[j] = pts[dumpedRows - j - 1]; - pts[dumpedRows - j - 1] = t; - } - break; - } - - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: { - int32_t mid = dumpedRows >> 1u; - int8_t* pts = (int8_t*)pColData->pData; - for (int32_t j = 0; j < mid; ++j) { - int64_t t = pts[j]; - pts[j] = pts[dumpedRows - j - 1]; - pts[dumpedRows - j - 1] = t; - } - break; - } - - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: { - int32_t mid = dumpedRows >> 1u; - int16_t* pts = (int16_t*)pColData->pData; - for (int32_t j = 0; j < mid; ++j) { - int64_t t = pts[j]; - pts[j] = pts[dumpedRows - j - 1]; - pts[dumpedRows - j - 1] = t; - } - break; - } - - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: { - int32_t mid = dumpedRows >> 1u; - int32_t* pts = (int32_t*)pColData->pData; - for (int32_t j = 0; j < mid; ++j) { - int64_t t = pts[j]; - pts[j] = pts[dumpedRows - j - 1]; - pts[dumpedRows - j - 1] = t; - } - break; - } - } - } - - // null value exists, check one-by-one - if (pData->flag != HAS_VALUE) { - for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step, rowIndex++) { - uint8_t v = tColDataGetBitValue(pData, j); - if (v == 0 || v == 1) { - colDataSetNull_f(pColData->nullbitmap, rowIndex); - } - } - } - } else { + copyNumericCols(pData, pDumpInfo, pColData, dumpedRows, asc); + } else { // varchar/nchar type for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) { tColDataGetValue(pData, j, &cv); doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e3a76ad33a..eb30945109 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3022,6 +3022,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } } #endif + if (numOfElems == 0) { // save selectivity value for column consisted of all null values firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); From 459bed5f86fd60bc1d615d40fe04a3d08e7c6c1c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Nov 2022 10:41:53 +0800 Subject: [PATCH 14/14] enh(query): improve the query perf. --- source/libs/function/src/builtinsimpl.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index eb30945109..6eb57c1a18 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3007,6 +3007,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } } #else + int64_t* pts = (int64_t*) pInput->pPTS->pData; for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { continue; @@ -3015,7 +3016,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pInputCol, i); - TSKEY cts = getRowPTs(pInput->pPTS, i); + TSKEY cts = pts[i]; if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); pResInfo->numOfRes = 1;