diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 532c35165b..e8830e32f8 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -87,7 +87,8 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData); int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow); void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowDestroy(SRow *pRow); -int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag); +void tRowSort(SArray *aRowP); +int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index dbf616f1c6..ccba25873d 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -527,7 +527,7 @@ static int32_t tRowPCmprFn(const void *p1, const void *p2) { return 0; } static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); } -static int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) { +static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) { int32_t code = 0; int32_t nRow = iEnd - iStart; @@ -591,12 +591,14 @@ _exit: if (code) tRowDestroy(pRow); return code; } -int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag) { - if (aRowP->size <= 1) return 0; - - int32_t code = 0; +void tRowSort(SArray *aRowP) { + if (TARRAY_SIZE(aRowP) <= 1) return; taosArraySort(aRowP, tRowPCmprFn); +} + +int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) { + int32_t code = 0; int32_t iStart = 0; while (iStart < aRowP->size) { @@ -612,7 +614,7 @@ int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag) { } if (iEnd - iStart > 1) { - code = tRowMerge(aRowP, pTSchema, iStart, iEnd, flag); + code = tRowMergeImpl(aRowP, pTSchema, iStart, iEnd, flag); } // the array is also changing, so the iStart just ++ instead of iEnd @@ -2186,7 +2188,6 @@ int32_t tGetColData(uint8_t *pBuf, SColData *pColData) { n += pColData->nData; } else { pColData->nData = TYPE_BYTES[pColData->type] * pColData->nVal; - pColData->nData = pBuf + n; n += pColData->nData; } } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c58cf8f630..ed71891783 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -115,13 +115,13 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2); void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); SColVal *tsdbRowIterNext(STSDBRowIter *pIter); // SRowMerger -int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); -int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); +int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); +int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); -int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); -void tRowMergerClear(SRowMerger *pMerger); -int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow); -int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); +int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); +void tsdbRowMergerClear(SRowMerger *pMerger); +int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow); +int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); // TSDBKEY diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index fda7618dd1..76b45ffec7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -261,12 +261,12 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SR SRowMerger merger = {0}; STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); - tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema); + tsdbRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema); - tRowMerge(&merger, &tsdbRowFromTSRow(1, row)); + tsdbRowMerge(&merger, &tsdbRowFromTSRow(1, row)); - tRowMergerGetRow(&merger, &mergedRow); - tRowMergerClear(&merger); + tsdbRowMergerGetRow(&merger, &mergedRow); + tsdbRowMergerClear(&merger); taosMemoryFreeClear(pTSchema); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f2515c6d48..d56851478b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1777,7 +1777,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1787,10 +1787,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tRowMerge(&merge, &fRow1); + tsdbRowMerge(&merge, &fRow1); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1800,11 +1800,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { if (init) { - tRowMerge(&merge, pRow); + tsdbRowMerge(&merge, pRow); } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, pRow, pSchema); + int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1818,7 +1818,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, pRow, pSchema); + int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1832,10 +1832,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tRowMerge(&merge, &fRow1); + tsdbRowMerge(&merge, &fRow1); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1845,10 +1845,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { if (init) { - tRowMerge(&merge, &fRow); + tsdbRowMerge(&merge, &fRow); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1857,7 +1857,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - int32_t code = tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1865,7 +1865,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } @@ -1886,16 +1886,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tRowMerge(&merge, &fRow1); + tsdbRowMerge(&merge, &fRow1); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); - code = tRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1903,10 +1903,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); } } else { // not merge block data - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1919,7 +1919,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - code = tRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1927,7 +1927,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); } return TSDB_CODE_SUCCESS; @@ -1955,7 +1955,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader SRow* pTSRow = NULL; SRowMerger merge = {0}; - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1963,11 +1963,11 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tRowMerge(&merge, &fRow1); + tsdbRowMerge(&merge, &fRow1); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange); - code = tRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1975,7 +1975,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); return code; } else { ASSERT(0); @@ -2056,7 +2056,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2067,10 +2067,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tRowMerge(&merge, &fRow1); + tsdbRowMerge(&merge, &fRow1); } else { init = true; - code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2081,7 +2081,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == ik.ts) { if (init) { - tRowMerge(&merge, piRow); + tsdbRowMerge(&merge, piRow); } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); @@ -2089,7 +2089,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - code = tRowMergerInit(&merge, piRow, pSchema); + code = tsdbRowMergerInit(&merge, piRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2108,10 +2108,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - tRowMerge(&merge, pRow); + tsdbRowMerge(&merge, pRow); } else { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - code = tRowMergerInit(&merge, pRow, pSchema); + code = tsdbRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2126,7 +2126,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - code = tRowMergerInit(&merge, pRow, pSchema); + code = tsdbRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2140,11 +2140,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == ik.ts) { if (init) { - tRowMerge(&merge, piRow); + tsdbRowMerge(&merge, piRow); } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - code = tRowMergerInit(&merge, piRow, pSchema); + code = tsdbRowMergerInit(&merge, piRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2159,10 +2159,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tRowMerge(&merge, &fRow1); + tsdbRowMerge(&merge, &fRow1); } else { init = true; - code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2173,7 +2173,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); if (!init) { - code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2181,7 +2181,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (merge.pTSchema == NULL) { return code; } - tRowMerge(&merge, &fRow); + tsdbRowMerge(&merge, &fRow); } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } @@ -2191,7 +2191,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - code = tRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2199,7 +2199,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); return code; } @@ -2356,13 +2356,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc SRow* pTSRow = NULL; SRowMerger merge = {0}; - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - code = tRowMergerGetRow(&merge, &pTSRow); + code = tsdbRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2370,7 +2370,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); taosMemoryFree(pTSRow); - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); return TSDB_CODE_SUCCESS; } } @@ -3232,7 +3232,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe return terrno; } - tRowMergerAdd(pMerger, pRow, pTSchema); + tsdbRowMergerAdd(pMerger, pRow, pTSchema); } return TSDB_CODE_SUCCESS; @@ -3247,7 +3247,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd } TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tRowMerge(pMerger, &fRow); + tsdbRowMerge(pMerger, &fRow); rowIndex += step; } @@ -3344,7 +3344,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tRowMerge(pMerger, &fRow1); + tsdbRowMerge(pMerger, &fRow1); } else { break; } @@ -3394,7 +3394,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, pReader->pSchema = pTSchema; } - int32_t code = tRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema); + int32_t code = tsdbRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3404,19 +3404,19 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return terrno; } - tRowMergerAdd(&merge, pNextRow, pTSchema1); + tsdbRowMergerAdd(&merge, pNextRow, pTSchema1); code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } - code = tRowMergerGetRow(&merge, pTSRow); + code = tsdbRowMergerGetRow(&merge, pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; } - tRowMergerClear(&merge); + tsdbRowMergerClear(&merge); *freeTSRow = true; return TSDB_CODE_SUCCESS; } @@ -3431,7 +3431,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, piRow, pSchema); + int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3442,7 +3442,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - tRowMerge(&merge, pRow); + tsdbRowMerge(&merge, pRow); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) { @@ -3452,7 +3452,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p } else { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tRowMergerInit(&merge, pRow, pSchema); + int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { return code; } @@ -3463,7 +3463,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - tRowMerge(&merge, piRow); + tsdbRowMerge(&merge, piRow); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) { @@ -3471,7 +3471,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p } } - int32_t code = tRowMergerGetRow(&merge, pTSRow); + int32_t code = tsdbRowMergerGetRow(&merge, pTSRow); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8770981022..b019bf3165 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -642,7 +642,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { // SRowMerger ====================================================== -int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { +int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; @@ -697,7 +697,7 @@ _exit: return code; } -int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { +int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; @@ -736,7 +736,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { return code; } -int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { +int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; @@ -775,9 +775,9 @@ _exit: return code; } -void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } +void tsdbRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } -int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { +int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; @@ -807,7 +807,7 @@ _exit: return code; } -int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { +int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index a271165d5d..b8158ea7ed 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -1197,8 +1197,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { while (TSDB_CODE_SUCCESS == code && NULL != p) { STableDataCxt* pTableCxt = *(STableDataCxt**)p; if (pTableCxt->ordered) { - code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); + tRowSort(pTableCxt->pData->aRowP); } + code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); if (TSDB_CODE_SUCCESS == code) { SVgroupDataCxt* pVgCxt = NULL; int32_t vgId = pTableCxt->pMeta->vgId;