diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0c4ada2cb1..0dd767990e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -128,7 +128,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowMergerClear(SRowMerger *pMerger); -int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow); +// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow); int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); // TABLEID int32_t tTABLEIDCmprFn(const void *p1, const void *p2); @@ -224,7 +224,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); -void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum); +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum); // STbData int32_t tsdbGetNRowsInTbData(STbData *pTbData); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1ac327de7f..338231057c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -959,14 +959,15 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int } static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, - SBlockLoadSuppInfo* pSup) { + SBlockLoadSuppInfo* pSup) { if (IS_VAR_DATA_TYPE(pColVal->type)) { if (!COL_VAL_IS_VALUE(pColVal)) { colDataSetNULL(pColInfoData, rowIndex); } else { varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); if (pColVal->value.nData > pColInfoData->info.bytes) { - tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes); + tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, + pColInfoData->info.bytes); return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; } if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 @@ -1794,7 +1795,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* } static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, - SFileBlockDumpInfo* pDumpInfo, bool *copied) { + SFileBlockDumpInfo* pDumpInfo, bool* copied) { // opt version // 1. it is not a border point // 2. the direct next point is not an duplicated timestamp @@ -1843,7 +1844,8 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc } static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, - STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) { + STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, + bool* copied) { int32_t code = TSDB_CODE_SUCCESS; *copied = false; @@ -1856,7 +1858,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas if (code) { return code; } - + *copied = true; return code; } @@ -1865,7 +1867,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas if (code) { return code; } - + *copied = true; return code; } @@ -1977,7 +1979,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMerge(&merge, &fRow1); + tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); @@ -2025,7 +2027,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMerge(&merge, &fRow1); + tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); @@ -2038,7 +2040,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { if (init) { - tsdbRowMerge(&merge, &fRow); + tsdbRowMergerAdd(&merge, &fRow, NULL); } else { init = true; int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); @@ -2068,11 +2070,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - bool copied = false; - int32_t code = TSDB_CODE_SUCCESS; - SRow* pTSRow = NULL; - SRowMerger merge = {0}; - TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + bool copied = false; + int32_t code = TSDB_CODE_SUCCESS; + SRow* pTSRow = NULL; + SRowMerger merge = {0}; + TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr); // only last block exists @@ -2081,7 +2083,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, if (code) { return code; } - + if (copied) { pBlockScanInfo->lastKey = tsLastBlock; return TSDB_CODE_SUCCESS; @@ -2092,7 +2094,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMerge(&merge, &fRow1); + tsdbRowMergerAdd(&merge, &fRow1, NULL); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); code = tsdbRowMergerGetRow(&merge, &pTSRow); @@ -2108,7 +2110,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, if (code != TSDB_CODE_SUCCESS) { return code; } - } } else { // not merge block data int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); @@ -2171,7 +2172,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMerge(&merge, &fRow1); + tsdbRowMergerAdd(&merge, &fRow1, NULL); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr); @@ -2273,7 +2274,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMerge(&merge, &fRow1); + tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); @@ -2287,7 +2288,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == ik.ts) { if (init) { - tsdbRowMerge(&merge, piRow); + tsdbRowMergerAdd(&merge, piRow, NULL); } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); @@ -2314,7 +2315,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - tsdbRowMerge(&merge, pRow); + tsdbRowMergerAdd(&merge, pRow, NULL); } else { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); code = tsdbRowMergerInit(&merge, pRow, pSchema); @@ -2346,7 +2347,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == ik.ts) { if (init) { - tsdbRowMerge(&merge, piRow); + tsdbRowMergerAdd(&merge, piRow, NULL); } else { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); @@ -2365,7 +2366,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); if (init) { - tsdbRowMerge(&merge, &fRow1); + tsdbRowMergerAdd(&merge, &fRow1, NULL); } else { init = true; code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); @@ -2387,7 +2388,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (merge.pTSchema == NULL) { return code; } - tsdbRowMerge(&merge, &fRow); + tsdbRowMergerAdd(&merge, &fRow, NULL); } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } @@ -2557,12 +2558,12 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - bool copied = false; - int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied); + bool copied = false; + int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied); if (code) { return code; } - + if (copied) { pBlockScanInfo->lastKey = key; return TSDB_CODE_SUCCESS; @@ -2758,7 +2759,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { if (code) { goto _end; } - + // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); @@ -2776,8 +2777,8 @@ _end: updateComposedBlockInfo(pReader, el, pBlockScanInfo); if (pResBlock->info.rows > 0) { - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%" PRId64 ", elapsed time:%.2f ms %s", + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64 + ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -3018,7 +3019,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (code) { return code; } - + if (pResBlock->info.rows >= pReader->capacity) { break; } @@ -3028,8 +3029,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { updateComposedBlockInfo(pReader, el, pScanInfo); if (pResBlock->info.rows > 0) { - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%" PRId64 ", elapsed time:%.2f ms %s", + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64 + ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -3102,7 +3103,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (code) { return code; } - + if (pResBlock->info.rows >= pReader->capacity) { break; } @@ -3112,8 +3113,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { updateComposedBlockInfo(pReader, el, pScanInfo); if (pResBlock->info.rows > 0) { - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%" PRId64 ", elapsed time:%.2f ms %s", + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64 + ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -3139,7 +3140,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } - static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { int64_t st = taosGetTimestampUs(); LRUHandle* handle = NULL; @@ -3157,8 +3157,8 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade return TSDB_CODE_SUCCESS; } - SBlockIdx* pBlockIdx = NULL; - int32_t i = 0; + SBlockIdx* pBlockIdx = NULL; + int32_t i = 0; for (int32_t i = 0; i < num; ++i) { pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i); if (pBlockIdx->suid != pReader->suid) { @@ -3170,7 +3170,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade continue; } - STableBlockScanInfo *pScanInfo = *p; + STableBlockScanInfo* pScanInfo = *p; tMapDataReset(&pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); @@ -3186,15 +3186,14 @@ _end: return code; } - static int32_t doSumSttBlockRows(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - SSttBlockLoadInfo* pBlockLoadInfo = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + SSttBlockLoadInfo* pBlockLoadInfo = NULL; for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file pBlockLoadInfo = &pLastBlockReader->pInfo[i]; - + code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); if (code) { return code; @@ -3202,9 +3201,9 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); if (size >= 1) { - SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); - SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); - + SSttBlk* pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0); + SSttBlk* pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1); + // all identical if (pStart->suid == pEnd->suid) { if (pStart->suid != pReader->suid) { @@ -3213,17 +3212,17 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { continue; } for (int32_t i = 0; i < size; ++i) { - SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); pReader->rowsNum += p->nRow; } } else { for (int32_t i = 0; i < size; ++i) { - SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); + SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i); uint64_t s = p->suid; if (s < pReader->suid) { continue; } - + if (s == pReader->suid) { pReader->rowsNum += p->nRow; } else if (s > pReader->suid) { @@ -3238,7 +3237,7 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) { } static int32_t readRowsCountFromFiles(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; while (1) { bool hasNext = false; @@ -3259,7 +3258,7 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) { code = doSumSttBlockRows(pReader); if (code != TSDB_CODE_SUCCESS) { return code; - } + } } pReader->status.loadFromFile = false; @@ -3268,8 +3267,8 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) { } static int32_t readRowsCountFromMem(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t memNum = 0, imemNum = 0; + int32_t code = TSDB_CODE_SUCCESS; + int64_t memNum = 0, imemNum = 0; if (pReader->pReadSnap->pMem != NULL) { tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum); } @@ -3283,7 +3282,6 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) { return code; } - static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -3696,7 +3694,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe tsdbRowMergerAdd(pMerger, pRow, pTSchema); } else { // column format - tsdbRowMerge(pMerger, pRow); + tsdbRowMergerAdd(pMerger, pRow, NULL); } } @@ -3712,7 +3710,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd } TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tsdbRowMerge(pMerger, &fRow); + tsdbRowMergerAdd(pMerger, &fRow, NULL); rowIndex += step; } @@ -3790,7 +3788,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - tsdbRowMerge(pMerger, &fRow1); + tsdbRowMergerAdd(pMerger, &fRow1, NULL); } else { tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, @@ -3863,7 +3861,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return code; } - tsdbRowMerge(&merge, pNextRow); + tsdbRowMergerAdd(&merge, pNextRow, NULL); } code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, &merge, pReader); @@ -3926,7 +3924,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - tsdbRowMerge(&merge, piRow); + tsdbRowMergerAdd(&merge, piRow, NULL); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) { @@ -4000,7 +3998,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) { int32_t outputRowIndex = pBlock->info.rows; int64_t uid = pScanInfo->uid; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); @@ -4106,7 +4104,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; do { // SRow* pTSRow = NULL; @@ -4342,7 +4340,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL if (countOnly) { pReader->readMode = READ_MODE_COUNT_ONLY; } - + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -4644,7 +4642,7 @@ _err: } static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SSDataBlock* pBlock = pReader->pResBlock; if (pReader->status.loadFromFile == false) { @@ -4664,15 +4662,15 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { pBlock->info.rows = pReader->rowsNum; pBlock->info.id.uid = 0; pBlock->info.dataLoad = 0; - + pReader->rowsNum = 0; - + return pBlock->info.rows > 0; } -static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { +static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; - + // cleanup the data that belongs to the previous data block SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); @@ -4707,11 +4705,11 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { return code; } -int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { +int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; *hasNext = false; - + if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) { return code; } @@ -4731,7 +4729,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { tsdbReleaseReader(pReader); return code; } - + pReader->step = EXTERNAL_ROWS_PREV; if (*hasNext) { pStatus = &pReader->innerReader[0]->status; @@ -4762,7 +4760,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { tsdbReleaseReader(pReader); return code; } - + if (*hasNext) { if (pStatus->composedDataBlock) { qTrace("tsdb/read: %p, unlock read mutex", pReader); @@ -4786,7 +4784,7 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) { tsdbReleaseReader(pReader); return code; } - + pReader->step = EXTERNAL_ROWS_NEXT; if (*hasNext) { pStatus = &pReader->innerReader[1]->status; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e4592d2758..9aeea22b99 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -712,6 +712,9 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) STColumn *pTColumn; int32_t iCol, jCol = 1; + if (NULL == pTSchema) { + pTSchema = pMerger->pTSchema; + } ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts); for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) { @@ -833,7 +836,7 @@ void tsdbRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } - +/* int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); @@ -898,7 +901,7 @@ int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { _exit: return code; } - +*/ int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); }