From 6d9e229a72496cd6e975d98fe71116e3c7f8e1c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Dec 2022 11:40:03 +0800 Subject: [PATCH] refactor: fix the in-memory row merge. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 44 ++++++++++++------- .../libs/function/src/detail/tavgfunction.c | 21 +++------ 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 472211b536..ebacda0677 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -194,7 +194,7 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange); -static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow); static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); @@ -3358,7 +3358,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc return TSDB_CODE_SUCCESS; } -int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow, +int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; @@ -3367,25 +3367,27 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { - *pTSRow = current.pTSRow; + *pResRow = *pRow; *freeTSRow = false; return TSDB_CODE_SUCCESS; } else { // has next point in mem/imem pNextRow = getValidMemRow(pIter, pDelList, pReader); if (pNextRow == NULL) { - *pTSRow = current.pTSRow; + *pResRow = *pRow; *freeTSRow = false; return TSDB_CODE_SUCCESS; } if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow)) { - *pTSRow = current.pTSRow; + *pResRow = *pRow; *freeTSRow = false; return TSDB_CODE_SUCCESS; } } } +#if 0 + // todo handle the data block merge SRowMerger merge = {0}; // get the correct schema for data in memory @@ -3423,6 +3425,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, tsdbRowMergerClear(&merge); *freeTSRow = true; +#endif + return TSDB_CODE_SUCCESS; } @@ -3480,7 +3484,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow, int64_t endKey, +int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, bool* freeTSRow) { TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); @@ -3510,13 +3514,14 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR int32_t code = TSDB_CODE_SUCCESS; if (ik.ts != k.ts) { if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts - code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { - code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); } } else { // ik.ts == k.ts *freeTSRow = true; - code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); + pResRow->type = TSDBROW_ROW_FMT; + code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3526,12 +3531,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, + return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); } return TSDB_CODE_SUCCESS; @@ -3636,17 +3641,22 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e SSDataBlock* pBlock = pReader->pResBlock; do { - SRow* pTSRow = NULL; +// SRow* pTSRow = NULL; + TSDBROW row = {.type = -1}; bool freeTSRow = false; - tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); - if (pTSRow == NULL) { + tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); + if (row.type == -1) { break; } - doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo); + if (row.type == TSDBROW_ROW_FMT) { + doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); - if (freeTSRow) { - taosMemoryFree(pTSRow); + if (freeTSRow) { + taosMemoryFree(row.pTSRow); + } + } else { + doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); } // no data in buffer, return immediately diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index f06bafafe3..40698322a0 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p #if __AVX2__ // find the start position that are aligned to 32bytes address in memory - int32_t width = (bitWidth>>3u) / sizeof(int64_t); + int32_t width = (bitWidth >> 3u) / sizeof(int64_t); int32_t remainder = numOfRows % width; int32_t rounds = numOfRows / width; @@ -286,20 +286,11 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p } // let sum up the final results - if (type == TSDB_DATA_TYPE_BIGINT) { - const int64_t* q = (const int64_t*)∑ - pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; + const int64_t* q = (const int64_t*)∑ + pRes->sum.isum += q[0] + q[1] + q[2] + q[3]; - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.isum += plist[j + rounds * width]; - } - } else { - const uint64_t* q = (const uint64_t*)∑ - pRes->sum.usum += q[0] + q[1] + q[2] + q[3]; - - for (int32_t j = 0; j < remainder; ++j) { - pRes->sum.usum += (uint64_t)plist[j + rounds * width]; - } + for (int32_t j = 0; j < remainder; ++j) { + pRes->sum.isum += plist[j + rounds * width]; } #endif @@ -588,7 +579,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { const int64_t* plist = (const int64_t*) pCol->pData; // 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop - if (simdAvailable) { + if (simdAvailable && type == TSDB_DATA_TYPE_BIGINT) { i64VectorSumAVX2(plist, numOfRows, pAvgRes); } else { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {