Merge pull request #16003 from taosdata/fix/TD-18321
fix: use mem schema if pReader->pSchema is null
This commit is contained in:
commit
6058b8dff5
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
#include "osDef.h"
|
#include "osDef.h"
|
||||||
|
#include "tsdb.h"
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -130,8 +130,8 @@ struct STsdbReader {
|
||||||
SBlockLoadSuppInfo suppInfo;
|
SBlockLoadSuppInfo suppInfo;
|
||||||
STsdbReadSnap* pReadSnap;
|
STsdbReadSnap* pReadSnap;
|
||||||
SIOCostSummary cost;
|
SIOCostSummary cost;
|
||||||
STSchema* pSchema;// the newest version schema
|
STSchema* pSchema; // the newest version schema
|
||||||
STSchema* pMemSchema;// the previous schema for in-memory data, to avoid load schema too many times
|
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
|
||||||
SDataFReader* pFileReader;
|
SDataFReader* pFileReader;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
|
|
||||||
|
@ -1213,17 +1213,17 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, SFileBlockDumpInfo* pDumpInfo) {
|
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
|
||||||
|
SFileBlockDumpInfo* pDumpInfo) {
|
||||||
// opt version
|
// opt version
|
||||||
// 1. it is not a border point
|
// 1. it is not a border point
|
||||||
// 2. the direct next point is not an duplicated timestamp
|
// 2. the direct next point is not an duplicated timestamp
|
||||||
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
|
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
|
||||||
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
|
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
|
||||||
int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1;
|
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
|
||||||
|
|
||||||
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||||
if (nextKey != key) { // merge is not needed
|
if (nextKey != key) { // merge is not needed
|
||||||
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||||
pDumpInfo->rowIndex += step;
|
pDumpInfo->rowIndex += step;
|
||||||
return true;
|
return true;
|
||||||
|
@ -1239,7 +1239,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
||||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
|
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sversion == pReader->pSchema->version) {
|
if (pReader->pSchema && sversion == pReader->pSchema->version) {
|
||||||
return pReader->pSchema;
|
return pReader->pSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1265,10 +1265,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||||
bool freeTSRow = false;
|
bool freeTSRow = false;
|
||||||
uint64_t uid = pBlockScanInfo->uid;
|
uint64_t uid = pBlockScanInfo->uid;
|
||||||
|
|
||||||
// ascending order traverse
|
// ascending order traverse
|
||||||
|
@ -2153,7 +2153,7 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
|
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
|
||||||
TSDBKEY key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
|
TSDBKEY key = {.ts = pRow->pTSRow->ts, .version = pRow->version};
|
||||||
if (outOfTimeWindow(key.ts, &pReader->window)) {
|
if (outOfTimeWindow(key.ts, &pReader->window)) {
|
||||||
pIter->hasVal = false;
|
pIter->hasVal = false;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2186,7 +2186,6 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -2318,9 +2317,8 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
|
|
||||||
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
||||||
STsdbReader* pReader, bool* freeTSRow) {
|
STsdbReader* pReader, bool* freeTSRow) {
|
||||||
|
|
||||||
TSDBROW* pNextRow = NULL;
|
TSDBROW* pNextRow = NULL;
|
||||||
TSDBROW current = *pRow;
|
TSDBROW current = *pRow;
|
||||||
|
|
||||||
{ // if the timestamp of the next valid row has a different ts, return current row directly
|
{ // if the timestamp of the next valid row has a different ts, return current row directly
|
||||||
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
|
@ -2350,6 +2348,10 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
||||||
// get the correct schema for data in memory
|
// get the correct schema for data in memory
|
||||||
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
||||||
|
|
||||||
|
if (pReader->pSchema == NULL) {
|
||||||
|
pReader->pSchema = pTSchema;
|
||||||
|
}
|
||||||
|
|
||||||
tRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
tRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
||||||
|
|
||||||
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
|
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
|
||||||
|
@ -2390,8 +2392,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
|
||||||
tRowMergerGetRow(&merge, pTSRow);
|
tRowMergerGetRow(&merge, pTSRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
|
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
|
||||||
int64_t endKey, bool* freeTSRow) {
|
bool* freeTSRow) {
|
||||||
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||||
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
|
||||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||||
|
@ -2446,7 +2448,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
|
STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
|
||||||
|
|
||||||
SColVal colVal = {0};
|
SColVal colVal = {0};
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
|
@ -2532,7 +2534,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
|
|
||||||
do {
|
do {
|
||||||
STSRow* pTSRow = NULL;
|
STSRow* pTSRow = NULL;
|
||||||
bool freeTSRow = false;
|
bool freeTSRow = false;
|
||||||
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
|
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
|
||||||
if (pTSRow == NULL) {
|
if (pTSRow == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -2581,9 +2583,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
||||||
return metaGetIvtIdx(pMeta);
|
return metaGetIvtIdx(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t getReaderMaxVersion(STsdbReader *pReader) {
|
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
||||||
return pReader->verRange.maxVer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get all suids since suid
|
* @brief Get all suids since suid
|
||||||
|
@ -2761,7 +2761,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
SIOCostSummary* pCost = &pReader->cost;
|
SIOCostSummary* pCost = &pReader->cost;
|
||||||
|
|
||||||
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
|
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
|
||||||
" SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-time:%.2f ms, "
|
" SMA-time:%.2f ms, fileBlocks:%" PRId64
|
||||||
|
", fileBlocks-time:%.2f ms, "
|
||||||
"build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
|
"build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
|
||||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
||||||
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
||||||
|
@ -2769,7 +2770,9 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
taosMemoryFree(pReader->idStr);
|
taosMemoryFree(pReader->idStr);
|
||||||
taosMemoryFree(pReader->pSchema);
|
taosMemoryFree(pReader->pSchema);
|
||||||
taosMemoryFree(pReader->pMemSchema);
|
if (pReader->pMemSchema != pReader->pSchema) {
|
||||||
|
taosMemoryFree(pReader->pMemSchema);
|
||||||
|
}
|
||||||
taosMemoryFreeClear(pReader);
|
taosMemoryFreeClear(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue