From dc9c5e769e18bd54a6953b68e7aa696c9d031569 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 11:24:21 +0800 Subject: [PATCH 01/14] enh: dynamically check the support of fma/avx/sse42/avx2 in cmake files. --- cmake/cmake.define | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index e34785cba6..d73c42e375 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -123,21 +123,31 @@ ELSE () SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () - IF (TD_INTEL_64 OR TD_INTEL_32) - ADD_DEFINITIONS("-msse4.2") - IF("${FMA_SUPPORT}" MATCHES "true") - MESSAGE(STATUS "fma function supported") - ADD_DEFINITIONS("-mfma") - ELSE () - MESSAGE(STATUS "fma function NOT supported") - ENDIF() + INCLUDE(CheckCCompilerFlag) + CHECK_C_COMPILER_FLAG("-msse4.2" GCC_SUPPORT_SSE42) + CHECK_C_COMPILER_FLAG("-mfma" GCC_SUPPORT_FMA) + CHECK_C_COMPILER_FLAG("-mavx" GCC_SUPPORT_AVX) + CHECK_C_COMPILER_FLAG("-mavx2" GCC_SUPPORT_AVX2) - IF("${SIMD_SUPPORT}" MATCHES "true") - ADD_DEFINITIONS("-mavx -mavx2") - MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED") - ELSE() - MESSAGE(STATUS "SIMD instruction (AVX/AVX2)is NOT ACTIVATED") + IF (GCC_SUPPORT_SSE42) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2") + ENDIF() + IF (GCC_SUPPORT_FMA) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma") + ENDIF() + + IF ("${SIMD_SUPPORT}" MATCHES "true") + IF (GCC_SUPPORT_AVX) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx") ENDIF() - ENDIF () + IF (GCC_SUPPORT_AVX2) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") + ENDIF() + MESSAGE(STATUS "SIMD instructions (AVX/AVX2) is ACTIVATED") + ENDIF() ENDIF () From 4b5f4e5175af12019837f431fce217a05fb0f000 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 13:07:30 +0800 Subject: [PATCH 02/14] fix(query): fix error in check if load block is needed --- source/libs/executor/src/scanoperator.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a0f11e9a47..d5239b340c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -110,9 +110,9 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn if (order == TSDB_ORDER_ASC) { w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey); - assert(w.ekey >= pBlockInfo->window.skey); + ASSERT(w.ekey >= pBlockInfo->window.skey); - if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) { + if (w.ekey < pBlockInfo->window.ekey) { return true; } @@ -122,16 +122,16 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn break; } - assert(w.ekey > pBlockInfo->window.ekey); + ASSERT(w.ekey > pBlockInfo->window.ekey); if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) { return true; } } } else { w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey); - assert(w.skey <= pBlockInfo->window.ekey); + ASSERT(w.skey <= pBlockInfo->window.ekey); - if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) { + if (w.skey > pBlockInfo->window.skey) { return true; } From 6283581cb82509030c1ef0686ec02e704c7f8f7e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 13:19:59 +0800 Subject: [PATCH 03/14] fix(query): remove invalid update block timestamp range. --- source/libs/executor/src/timewindowoperator.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d9a011a892..30b7f6b8fb 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1073,8 +1073,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true); - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag); } From ba50a7c30dc0dca158f2d6e4b1388457cc4387e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 14:25:26 +0800 Subject: [PATCH 04/14] refactor(query): add additional attribute to denote if current block has already loaded data. --- include/common/tcommon.h | 1 + source/common/src/tdatablock.c | 8 +++++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 6ec3d5db10..f74795a250 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -195,6 +195,7 @@ typedef struct SDataBlockInfo { uint32_t capacity; SBlockID id; int16_t hasVarCol; + int16_t dataLoad; // denote if the data is loaded or not // TODO: optimize and remove following int64_t version; // used for stream, and need serialization diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dfd0b68039..0e89ecdd68 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -358,7 +358,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSiz size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) { - if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { + if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) { return 0; } @@ -1157,13 +1157,14 @@ void blockDataEmpty(SSDataBlock* pDataBlock) { } pInfo->rows = 0; + pInfo->dataLoad = 0; pInfo->window.ekey = 0; pInfo->window.skey = 0; } // todo temporarily disable it static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { - ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/); + ASSERT(numOfRows > 0); if (numOfRows <= pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } @@ -1220,7 +1221,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* return TSDB_CODE_SUCCESS; } -void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { +void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { pColumn->hasNull = false; if (IS_VAR_DATA_TYPE(pColumn->info.type)) { @@ -2427,6 +2428,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += colLen[i]; } + pBlock->info.dataLoad = 1; pBlock->info.rows = numOfRows; ASSERT(pStart - pData == dataLen); return pStart; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a0334684a3..e38f9401ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1143,6 +1143,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } + pResBlock->info.dataLoad = 1; pResBlock->info.rows = dumpedRows; pDumpInfo->rowIndex += step * dumpedRows; @@ -2538,6 +2539,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { _end: pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; + pResBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); setComposedBlockFlag(pReader, true); @@ -3622,6 +3624,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* i += 1; } + pBlock->info.dataLoad = 1; pBlock->info.rows += 1; pScanInfo->lastKey = pTSRow->ts; return TSDB_CODE_SUCCESS; @@ -3669,6 +3672,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S i += 1; } + pResBlock->info.dataLoad = 1; pResBlock->info.rows += 1; return TSDB_CODE_SUCCESS; } From 1d9514d6d8e582cee34e3f0f3a9b0195f013d7cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 14:28:08 +0800 Subject: [PATCH 05/14] refactor(query): add an assert. --- source/libs/executor/src/timewindowoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 30b7f6b8fb..b2ace0fbc0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1027,9 +1027,10 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { TSKEY* tsCols = NULL; - if (pBlock->pDataBlock != NULL) { + if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad == 1) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; + ASSERT(tsCols[0] != 0); // no data in primary ts if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) { From 2bf21e6a176623c8276ba6266f1d27d6ea391858 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 14:33:39 +0800 Subject: [PATCH 06/14] fix(query): set flag for merged block. --- source/libs/executor/src/sortoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 005b794f0b..7c7ff9be86 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -105,6 +105,7 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { } } + pBlock->info.dataLoad = 1; pBlock->info.rows += 1; } From c31f36d84104f988069bccbe156f4eb9c5bb55d6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 15:06:40 +0800 Subject: [PATCH 07/14] fix(query): set data load flag for multi-way merge . --- source/libs/executor/src/sortoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7c7ff9be86..7ac007b7cb 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -699,6 +699,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; pDataBlock->info.id.groupId = pInfo->groupId; + pDataBlock->info.dataLoad = 1; } qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, From 8f02297890f8585950b74f8dff07cf48c4800e9f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 17:53:04 +0800 Subject: [PATCH 08/14] refactor: do some internal refactor. --- include/os/osEnv.h | 2 +- source/common/src/tglobal.c | 2 +- source/libs/function/src/detail/tavgfunction.c | 2 +- source/libs/function/src/detail/tminmax.c | 10 +++++----- source/os/src/osEnv.c | 2 +- source/os/src/osSysinfo.c | 10 +++++----- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/include/os/osEnv.h b/include/os/osEnv.h index a3bd209693..533d989ffc 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -36,7 +36,7 @@ extern int64_t tsStreamMax; extern float tsNumOfCores; extern int64_t tsTotalMemoryKB; extern char *tsProcPath; -extern char tsSIMDEnable; +extern char tsSIMDBuiltins; extern char tsSSE42Enable; extern char tsAVXEnable; extern char tsAVX2Enable; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3bcfddb8b2..552d7335f5 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -340,7 +340,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "AVX", tsAVXEnable, 0) != 0) return -1; if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, 0) != 0) return -1; if (cfgAddBool(pCfg, "FMA", tsFMAEnable, 0) != 0) return -1; - if (cfgAddBool(pCfg, "SIMD-Supported", tsSIMDEnable, 0) != 0) return -1; + if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, 0) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1; if (cfgAddInt64(pCfg, "streamMax", tsStreamMax, 0, INT64_MAX, 1) != 0) return -1; diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index 1c74d22a82..60bf30d8ed 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -514,7 +514,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem = pInput->numOfRows; pAvgRes->count += pInput->numOfRows; - bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE); + bool simdAvailable = tsAVXEnable && tsSIMDBuiltins && (numOfRows > THRESHOLD_SIZE); switch(type) { case TSDB_DATA_TYPE_UTINYINT: diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index b2cb36cba0..cb5cea3cc8 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -369,7 +369,7 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDEnable) { + if (tsAVX2Enable && tsSIMDBuiltins) { pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -403,7 +403,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDEnable) { + if (tsAVX2Enable && tsSIMDBuiltins) { pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -437,7 +437,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDEnable) { + if (tsAVX2Enable && tsSIMDBuiltins) { pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -500,7 +500,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo float* val = (float*)&pBuf->v; // AVX version to speedup the loop - if (tsAVXEnable && tsSIMDEnable) { + if (tsAVXEnable && tsSIMDBuiltins) { *val = floatVectorCmpAVX(pData, numOfRows, isMinFunc); } else { if (!pBuf->assign) { @@ -530,7 +530,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR double* val = (double*)&pBuf->v; // AVX version to speedup the loop - if (tsAVXEnable && tsSIMDEnable) { + if (tsAVXEnable && tsSIMDBuiltins) { *val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc); } else { if (!pBuf->assign) { diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 7063d1f574..c4627ffb75 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,7 +37,7 @@ float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; char *tsProcPath = NULL; -char tsSIMDEnable = 0; +char tsSIMDBuiltins = 0; char tsSSE42Enable = 0; char tsAVXEnable = 0; char tsAVX2Enable = 0; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 7ec1da0530..35f4566a06 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -484,11 +484,11 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { #ifdef _TD_X86_ // Since the compiler is not support avx/avx2 instructions, the global variables always need to be // set to be false -#if __AVX__ || __AVX2__ - tsSIMDEnable = true; -#else - tsSIMDEnable = false; -#endif +//#if __AVX__ || __AVX2__ +// tsSIMDBuiltins = true; +//#else +// tsSIMDBuiltins = false; +//#endif uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0; From d7edcfd207b50c9ce5b6481384e8c306ea37387d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Dec 2022 18:33:44 +0800 Subject: [PATCH 09/14] fix(query): set dataload flag. --- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/joinoperator.c | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 043cc396b5..41e3d890cd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1080,7 +1080,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.id.groupId); - + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return 0; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index d460af971c..8a097a23ce 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -87,11 +87,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } int32_t numOfCols = 0; - SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); - - pInfo->pRes = pResBlock; + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.pExprInfo = pExprInfo; @@ -401,6 +401,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) // the pDataBlock are always the same one, no need to call this again pRes->info.rows = nrows; + pRes->info.dataLoad = 1; if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } @@ -412,7 +413,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes); - blockDataEnsureCapacity(pRes, 4096); + while (true) { int32_t numOfRowsBefore = pRes->info.rows; doMergeJoinImpl(pOperator, pRes); From e3e42cdfbdfdc19c948524e696d944ddcd6e6598 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Dec 2022 00:16:40 +0800 Subject: [PATCH 10/14] fix(query): set update ts flag for stream. --- source/common/src/tdatablock.c | 4 ++++ source/dnode/vnode/src/tq/tqRead.c | 1 + source/libs/executor/src/exchangeoperator.c | 1 + source/libs/executor/src/executorimpl.c | 2 ++ source/libs/executor/src/scanoperator.c | 5 +++++ 5 files changed, 13 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0e89ecdd68..aeea9b2681 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -358,6 +358,10 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSiz size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) { + if (pDataBlock->info.rows > 0) { + ASSERT(pDataBlock->info.dataLoad == 1); + } + if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) { return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c3a4cefc66..46b31bc5b0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -533,6 +533,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { pBlock->info.id.uid = pReader->msgIter.uid; pBlock->info.rows = pReader->msgIter.numOfRows; pBlock->info.version = pReader->pMsg->version; + pBlock->info.dataLoad = 1; while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 8423b77906..4103ca82dc 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -510,6 +510,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo blockDataEnsureCapacity(pRes, pBlock->info.rows); // data from mnode + pRes->info.dataLoad = 1; pRes->info.rows = pBlock->info.rows; relocateColumnData(pRes, pColList, pBlock->pDataBlock, false); blockDataDestroy(pBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 41e3d890cd..1c80eff685 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2546,6 +2546,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pBlock->info.rows += pRow->numOfRows; releaseOutputBuf(pState, &key, pRow); } + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; } @@ -2635,6 +2636,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta } } + pBlock->info.dataLoad = 1; pBlock->info.rows += pRow->numOfRows; // saveSessionDiscBuf(pState, pKey, pVal, size); releaseOutputBuf(pState, NULL, pRow); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d5239b340c..018e969b35 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1313,6 +1313,7 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, } pDestBlock->info.type = STREAM_CLEAR; pDestBlock->info.version = pSrcBlock->info.version; + pDestBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pDestBlock, 0); return code; } @@ -1421,6 +1422,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } if (out && pInfo->pUpdateDataRes->info.rows > 0) { pInfo->pUpdateDataRes->info.version = pBlock->info.version; + pInfo->pUpdateDataRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0); pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR; } @@ -1483,6 +1485,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } + pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); @@ -1771,6 +1774,7 @@ FETCH_NEXT_BLOCK: // TODO move into scan pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -1948,6 +1952,7 @@ FETCH_NEXT_BLOCK: } doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + pInfo->pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { From e4ad9890705d387acd05dc3b510ddabf15f3313d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Dec 2022 00:17:11 +0800 Subject: [PATCH 11/14] fix(query): disable assert. --- source/common/src/tdatablock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index aeea9b2681..339a73efee 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -359,7 +359,7 @@ size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.ro int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) { if (pDataBlock->info.rows > 0) { - ASSERT(pDataBlock->info.dataLoad == 1); +// ASSERT(pDataBlock->info.dataLoad == 1); } if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) { From 9afdee5fd3bb699d041d052526a0761c91883610 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Dec 2022 01:30:10 +0800 Subject: [PATCH 12/14] fix(query): set data load flag. --- source/libs/executor/src/groupoperator.c | 2 ++ source/libs/executor/src/projectoperator.c | 1 + 2 files changed, 3 insertions(+) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2cd1bd7dec..22db8c21b5 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -951,6 +951,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } taosArrayDestroy(pParInfo->rowIds); pParInfo->rowIds = NULL; + pDest->info.dataLoad = 1; + blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); pDest->info.id.groupId = pParInfo->groupId; pOperator->resultInfo.totalRows += pDest->info.rows; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 65bb40b195..8e81a00098 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -654,6 +654,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) { setPseudoOutputColInfo(pResult, pCtx, pPseudoList); + pResult->info.dataLoad = 1; if (pSrcBlock == NULL) { for (int32_t k = 0; k < numOfOutput; ++k) { From aa18d0a4132dd44616554e1af866f46d09d3c16e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Dec 2022 19:19:12 +0800 Subject: [PATCH 13/14] other: adjust compiler flag for clang. --- cmake/cmake.define | 22 ++++++++++++++-------- cmake/cmake.platform | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index d73c42e375..600343740d 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -124,26 +124,32 @@ ELSE () ENDIF () INCLUDE(CheckCCompilerFlag) - CHECK_C_COMPILER_FLAG("-msse4.2" GCC_SUPPORT_SSE42) - CHECK_C_COMPILER_FLAG("-mfma" GCC_SUPPORT_FMA) - CHECK_C_COMPILER_FLAG("-mavx" GCC_SUPPORT_AVX) - CHECK_C_COMPILER_FLAG("-mavx2" GCC_SUPPORT_AVX2) + IF ("${CMAKE_C_COMPILER_ID}" MATCHES "Clang") + SET(COMPILER_SUPPORT_SSE42 true) + MESSAGE(STATUS "Always enable sse4.2 for Clang") + ELSE() + CHECK_C_COMPILER_FLAG("-msse4.2" COMPILER_SUPPORT_SSE42) + ENDIF() - IF (GCC_SUPPORT_SSE42) + CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA) + CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX) + CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2) + + IF (COMPILER_SUPPORT_SSE42) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2") ENDIF() - IF (GCC_SUPPORT_FMA) + IF (COMPILER_SUPPORT_FMA) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma") ENDIF() IF ("${SIMD_SUPPORT}" MATCHES "true") - IF (GCC_SUPPORT_AVX) + IF (COMPILER_SUPPORT_AVX) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx") ENDIF() - IF (GCC_SUPPORT_AVX2) + IF (COMPILER_SUPPORT_AVX2) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") ENDIF() diff --git a/cmake/cmake.platform b/cmake/cmake.platform index 711d74fa4c..a4bfcaf609 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -147,7 +147,7 @@ ELSE () ENDIF () ENDIF () -MESSAGE(STATUS "platform arch:" ${PLATFORM_ARCH_STR}) +MESSAGE(STATUS "Platform arch:" ${PLATFORM_ARCH_STR}) MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})") MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})") From 8b1f45231286a3d0d565c95dccdc545c3d91a5a4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Dec 2022 22:37:29 +0800 Subject: [PATCH 14/14] fix(query): set data load flag for multi-way merge . --- cmake/cmake.define | 4 ++-- source/libs/executor/src/groupoperator.c | 1 + source/libs/executor/test/executorTests.cpp | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 600343740d..7410c92525 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -124,9 +124,9 @@ ELSE () ENDIF () INCLUDE(CheckCCompilerFlag) - IF ("${CMAKE_C_COMPILER_ID}" MATCHES "Clang") + IF (("${CMAKE_C_COMPILER_ID}" MATCHES "Clang") OR ("${CMAKE_C_COMPILER_ID}" MATCHES "AppleClang")) SET(COMPILER_SUPPORT_SSE42 true) - MESSAGE(STATUS "Always enable sse4.2 for Clang") + MESSAGE(STATUS "Always enable sse4.2 for Clang/AppleClang") ELSE() CHECK_C_COMPILER_FLAG("-msse4.2" COMPILER_SUPPORT_SSE42) ENDIF() diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 22db8c21b5..d8a510c6ce 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -696,6 +696,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { pInfo->pageIndex += 1; releaseBufPage(pInfo->pBuf, page); + pInfo->binfo.pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 1c42163349..2efd06f440 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -166,6 +166,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { pInfo->current += 1; + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, 0); return pBlock; }