tsdb/row merger: remove tsdbRowMerge

This commit is contained in:
Minglei Jin 2023-04-07 13:31:33 +08:00
parent 360113115f
commit 55f7e41f3b
3 changed files with 79 additions and 78 deletions

View File

@ -128,7 +128,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger); void tsdbRowMergerClear(SRowMerger *pMerger);
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow); // int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
// TABLEID // TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
@ -224,7 +224,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum); void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum);
// STbData // STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData); int32_t tsdbGetNRowsInTbData(STbData *pTbData);

View File

@ -959,14 +959,15 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int
} }
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
SBlockLoadSuppInfo* pSup) { SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->type)) { if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (!COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(pColVal)) {
colDataSetNULL(pColInfoData, rowIndex); colDataSetNULL(pColInfoData, rowIndex);
} else { } else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
if (pColVal->value.nData > pColInfoData->info.bytes) { if (pColVal->value.nData > pColInfoData->info.bytes) {
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes); tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData,
pColInfoData->info.bytes);
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
} }
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
@ -1794,7 +1795,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
} }
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
SFileBlockDumpInfo* pDumpInfo, bool *copied) { SFileBlockDumpInfo* pDumpInfo, bool* copied) {
// opt version // opt version
// 1. it is not a border point // 1. it is not a border point
// 2. the direct next point is not an duplicated timestamp // 2. the direct next point is not an duplicated timestamp
@ -1843,7 +1844,8 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
} }
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) { STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
bool* copied) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*copied = false; *copied = false;
@ -1856,7 +1858,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
if (code) { if (code) {
return code; return code;
} }
*copied = true; *copied = true;
return code; return code;
} }
@ -1865,7 +1867,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
if (code) { if (code) {
return code; return code;
} }
*copied = true; *copied = true;
return code; return code;
} }
@ -1977,7 +1979,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMerge(&merge, &fRow1); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
@ -2025,7 +2027,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMerge(&merge, &fRow1); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
@ -2038,7 +2040,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
if (init) { if (init) {
tsdbRowMerge(&merge, &fRow); tsdbRowMergerAdd(&merge, &fRow, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
@ -2068,11 +2070,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
bool mergeBlockData) { bool mergeBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
bool copied = false; bool copied = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr); tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
// only last block exists // only last block exists
@ -2081,7 +2083,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
if (code) { if (code) {
return code; return code;
} }
if (copied) { if (copied) {
pBlockScanInfo->lastKey = tsLastBlock; pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2092,7 +2094,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
} }
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(&merge, &fRow1); tsdbRowMergerAdd(&merge, &fRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
@ -2108,7 +2110,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
} else { // not merge block data } else { // not merge block data
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
@ -2171,7 +2172,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(&merge, &fRow1); tsdbRowMergerAdd(&merge, &fRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
@ -2273,7 +2274,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMerge(&merge, &fRow1); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
@ -2287,7 +2288,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) { if (minKey == ik.ts) {
if (init) { if (init) {
tsdbRowMerge(&merge, piRow); tsdbRowMergerAdd(&merge, piRow, NULL);
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
@ -2314,7 +2315,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
tsdbRowMerge(&merge, pRow); tsdbRowMergerAdd(&merge, pRow, NULL);
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, pRow, pSchema); code = tsdbRowMergerInit(&merge, pRow, pSchema);
@ -2346,7 +2347,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) { if (minKey == ik.ts) {
if (init) { if (init) {
tsdbRowMerge(&merge, piRow); tsdbRowMergerAdd(&merge, piRow, NULL);
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
@ -2365,7 +2366,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMerge(&merge, &fRow1); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
@ -2387,7 +2388,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (merge.pTSchema == NULL) { if (merge.pTSchema == NULL) {
return code; return code;
} }
tsdbRowMerge(&merge, &fRow); tsdbRowMergerAdd(&merge, &fRow, NULL);
} }
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
@ -2557,12 +2558,12 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader) { STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
bool copied = false; bool copied = false;
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied); int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
if (code) { if (code) {
return code; return code;
} }
if (copied) { if (copied) {
pBlockScanInfo->lastKey = key; pBlockScanInfo->lastKey = key;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2758,7 +2759,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (code) { if (code) {
goto _end; goto _end;
} }
// currently loaded file data block is consumed // currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
@ -2776,8 +2777,8 @@ _end:
updateComposedBlockInfo(pReader, el, pBlockScanInfo); updateComposedBlockInfo(pReader, el, pBlockScanInfo);
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
" rows:%" PRId64 ", elapsed time:%.2f ms %s", ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
@ -3018,7 +3019,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (code) { if (code) {
return code; return code;
} }
if (pResBlock->info.rows >= pReader->capacity) { if (pResBlock->info.rows >= pReader->capacity) {
break; break;
} }
@ -3028,8 +3029,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
updateComposedBlockInfo(pReader, el, pScanInfo); updateComposedBlockInfo(pReader, el, pScanInfo);
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
" rows:%" PRId64 ", elapsed time:%.2f ms %s", ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3102,7 +3103,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (code) { if (code) {
return code; return code;
} }
if (pResBlock->info.rows >= pReader->capacity) { if (pResBlock->info.rows >= pReader->capacity) {
break; break;
} }
@ -3112,8 +3113,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
updateComposedBlockInfo(pReader, el, pScanInfo); updateComposedBlockInfo(pReader, el, pScanInfo);
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
" rows:%" PRId64 ", elapsed time:%.2f ms %s", ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
@ -3139,7 +3140,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code; return code;
} }
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
LRUHandle* handle = NULL; LRUHandle* handle = NULL;
@ -3157,8 +3157,8 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SBlockIdx* pBlockIdx = NULL; SBlockIdx* pBlockIdx = NULL;
int32_t i = 0; int32_t i = 0;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) { if (pBlockIdx->suid != pReader->suid) {
@ -3170,7 +3170,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
continue; continue;
} }
STableBlockScanInfo *pScanInfo = *p; STableBlockScanInfo* pScanInfo = *p;
tMapDataReset(&pScanInfo->mapData); tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
@ -3186,15 +3186,14 @@ _end:
return code; return code;
} }
static int32_t doSumSttBlockRows(STsdbReader* pReader) { static int32_t doSumSttBlockRows(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL; SSttBlockLoadInfo* pBlockLoadInfo = NULL;
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pLastBlockReader->pInfo[i]; pBlockLoadInfo = &pLastBlockReader->pInfo[i];
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) { if (code) {
return code; return code;
@ -3202,9 +3201,9 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) { if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); SSttBlk* pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); SSttBlk* pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
// all identical // all identical
if (pStart->suid == pEnd->suid) { if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->suid) { if (pStart->suid != pReader->suid) {
@ -3213,17 +3212,17 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
continue; continue;
} }
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
pReader->rowsNum += p->nRow; pReader->rowsNum += p->nRow;
} }
} else { } else {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
uint64_t s = p->suid; uint64_t s = p->suid;
if (s < pReader->suid) { if (s < pReader->suid) {
continue; continue;
} }
if (s == pReader->suid) { if (s == pReader->suid) {
pReader->rowsNum += p->nRow; pReader->rowsNum += p->nRow;
} else if (s > pReader->suid) { } else if (s > pReader->suid) {
@ -3238,7 +3237,7 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
} }
static int32_t readRowsCountFromFiles(STsdbReader* pReader) { static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
while (1) { while (1) {
bool hasNext = false; bool hasNext = false;
@ -3259,7 +3258,7 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
code = doSumSttBlockRows(pReader); code = doSumSttBlockRows(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
pReader->status.loadFromFile = false; pReader->status.loadFromFile = false;
@ -3268,8 +3267,8 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
} }
static int32_t readRowsCountFromMem(STsdbReader* pReader) { static int32_t readRowsCountFromMem(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int64_t memNum = 0, imemNum = 0; int64_t memNum = 0, imemNum = 0;
if (pReader->pReadSnap->pMem != NULL) { if (pReader->pReadSnap->pMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum); tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
} }
@ -3283,7 +3282,6 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) {
return code; return code;
} }
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
@ -3696,7 +3694,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
tsdbRowMergerAdd(pMerger, pRow, pTSchema); tsdbRowMergerAdd(pMerger, pRow, pTSchema);
} else { // column format } else { // column format
tsdbRowMerge(pMerger, pRow); tsdbRowMergerAdd(pMerger, pRow, NULL);
} }
} }
@ -3712,7 +3710,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd
} }
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tsdbRowMerge(pMerger, &fRow); tsdbRowMergerAdd(pMerger, &fRow, NULL);
rowIndex += step; rowIndex += step;
} }
@ -3790,7 +3788,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMerge(pMerger, &fRow1); tsdbRowMergerAdd(pMerger, &fRow1, NULL);
} else { } else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
@ -3863,7 +3861,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return code; return code;
} }
tsdbRowMerge(&merge, pNextRow); tsdbRowMergerAdd(&merge, pNextRow, NULL);
} }
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader); code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
@ -3926,7 +3924,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
tsdbRowMerge(&merge, piRow); tsdbRowMergerAdd(&merge, piRow, NULL);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader); pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4000,7 +3998,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) { int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
int32_t outputRowIndex = pBlock->info.rows; int32_t outputRowIndex = pBlock->info.rows;
int64_t uid = pScanInfo->uid; int64_t uid = pScanInfo->uid;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
@ -4106,7 +4104,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader) { STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
do { do {
// SRow* pTSRow = NULL; // SRow* pTSRow = NULL;
@ -4342,7 +4340,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
if (countOnly) { if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY; pReader->readMode = READ_MODE_COUNT_ONLY;
} }
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
@ -4644,7 +4642,7 @@ _err:
} }
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
if (pReader->status.loadFromFile == false) { if (pReader->status.loadFromFile == false) {
@ -4664,15 +4662,15 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
pBlock->info.rows = pReader->rowsNum; pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0; pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0; pBlock->info.dataLoad = 0;
pReader->rowsNum = 0; pReader->rowsNum = 0;
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// cleanup the data that belongs to the previous data block // cleanup the data that belongs to the previous data block
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
@ -4707,11 +4705,11 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
return code; return code;
} }
int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*hasNext = false; *hasNext = false;
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) { if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
return code; return code;
} }
@ -4731,7 +4729,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return code; return code;
} }
pReader->step = EXTERNAL_ROWS_PREV; pReader->step = EXTERNAL_ROWS_PREV;
if (*hasNext) { if (*hasNext) {
pStatus = &pReader->innerReader[0]->status; pStatus = &pReader->innerReader[0]->status;
@ -4762,7 +4760,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return code; return code;
} }
if (*hasNext) { if (*hasNext) {
if (pStatus->composedDataBlock) { if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader); qTrace("tsdb/read: %p, unlock read mutex", pReader);
@ -4786,7 +4784,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return code; return code;
} }
pReader->step = EXTERNAL_ROWS_NEXT; pReader->step = EXTERNAL_ROWS_NEXT;
if (*hasNext) { if (*hasNext) {
pStatus = &pReader->innerReader[1]->status; pStatus = &pReader->innerReader[1]->status;

View File

@ -712,6 +712,9 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
STColumn *pTColumn; STColumn *pTColumn;
int32_t iCol, jCol = 1; int32_t iCol, jCol = 1;
if (NULL == pTSchema) {
pTSchema = pMerger->pTSchema;
}
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts); ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) { for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
@ -833,7 +836,7 @@ void tsdbRowMergerClear(SRowMerger *pMerger) {
taosArrayDestroy(pMerger->pArray); taosArrayDestroy(pMerger->pArray);
} }
/*
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
@ -898,7 +901,7 @@ int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
_exit: _exit:
return code; return code;
} }
*/
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
} }