fix(tsdb): set the pk info when converting fileblockinfo to binrecord.

This commit is contained in:
Haojun Liao 2024-04-26 11:10:20 +08:00
parent ae4e2bf671
commit 815489d20a
1 changed files with 43 additions and 26 deletions

View File

@ -1054,14 +1054,30 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
}
}
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) {
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo, SBlockLoadSuppInfo* pSupp) {
record->uid = pBlockInfo->uid;
record->firstKey = (STsdbRowKey){
.key = {.ts = pBlockInfo->firstKey, .numOfPKs = 0},
};
record->lastKey = (STsdbRowKey){
.key = {.ts = pBlockInfo->lastKey, .numOfPKs = 0},
};
record->firstKey = (STsdbRowKey){.key = {.ts = pBlockInfo->firstKey, .numOfPKs = pSupp->numOfPks}};
record->lastKey = (STsdbRowKey){.key = {.ts = pBlockInfo->lastKey, .numOfPKs = pSupp->numOfPks}};
if (pSupp->numOfPks > 0) {
SValue* pFirst = &record->firstKey.key.pks[0];
SValue* pLast = &record->lastKey.key.pks[0];
pFirst->type = pSupp->pk.type;
pLast->type = pSupp->pk.type;
if (IS_VAR_DATA_TYPE(pFirst->type)) {
pFirst->pData = (uint8_t*) varDataVal(pBlockInfo->firstPk.pData);
pFirst->nData = varDataLen(pBlockInfo->firstPk.pData);
pLast->pData = (uint8_t*) varDataVal(pBlockInfo->lastPk.pData);
pLast->nData = varDataLen(pBlockInfo->lastPk.pData);
} else {
pFirst->val = pBlockInfo->firstPk.val;
pLast->val = pBlockInfo->lastPk.val;
}
}
record->minVer = pBlockInfo->minVer;
record->maxVer = pBlockInfo->maxVer;
record->blockOffset = pBlockInfo->blockOffset;
@ -1091,7 +1107,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
int32_t step = asc ? 1 : -1;
SBrinRecord tmp;
blockInfoToRecord(&tmp, pBlockInfo);
blockInfoToRecord(&tmp, pBlockInfo, pSupInfo);
SBrinRecord* pRecord = &tmp;
// no data exists, return directly.
@ -1290,7 +1306,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord tmp;
blockInfoToRecord(&tmp, pBlockInfo);
blockInfoToRecord(&tmp, pBlockInfo, pSup);
SBrinRecord* pRecord = &tmp;
code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1],
pSup->numOfCols - 1);
@ -1327,7 +1343,7 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) {
SBrinRecord* pRecord, SBlockLoadSuppInfo* pSupInfo) {
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
@ -1341,7 +1357,7 @@ static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockIn
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
blockInfoToRecord(pRecord, p);
blockInfoToRecord(pRecord, p, pSupInfo);
*nextIndex = pBlockInfo->tbBlockIdx + step;
return true;
@ -1464,7 +1480,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
bool hasNeighbor =
getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec);
getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec, pSupInfo);
// overlap with neighbor
if (hasNeighbor) {
@ -1473,7 +1489,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
}
SBrinRecord pRecord;
blockInfoToRecord(&pRecord, pBlockInfo);
blockInfoToRecord(&pRecord, pBlockInfo, pSupInfo);
// has duplicated ts of different version in this block
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
@ -2414,12 +2430,13 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
int32_t code = TSDB_CODE_SUCCESS;
int32_t order = pReader->info.order;
SDataBlockIter* pIter = &pReader->status.blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
int32_t nextIndex = -1;
SBrinRecord rec = {0};
*loadNeighbor = false;
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec);
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec, pSupInfo);
if (!hasNeighbor) { // do nothing
return code;
}
@ -4868,11 +4885,11 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
return TSDB_CODE_SUCCESS;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pResBlock->info.id.uid != pFBlock->uid) {
if (pResBlock->info.id.uid != pBlockInfo->uid) {
return TSDB_CODE_SUCCESS;
}
@ -4880,10 +4897,10 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
TARRAY2_CLEAR(&pSup->colAggArray, 0);
SBrinRecord pRecord;
blockInfoToRecord(&pRecord, pFBlock);
blockInfoToRecord(&pRecord, pBlockInfo, pSup);
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pRecord, &pSup->colAggArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pBlockInfo->uid, tstrerror(code),
pReader->idStr);
return code;
}
@ -4912,7 +4929,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
// do fill all null column value SMA info
doFillNullColSMA(pSup, pFBlock->numRow, numOfCols, pTsAgg);
doFillNullColSMA(pSup, pBlockInfo->numRow, numOfCols, pTsAgg);
size_t size = pSup->colAggArray.size;
@ -4938,7 +4955,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.smaLoadTime += 0; // elapsedTime;
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pBlockInfo->uid, pReader->idStr);
return code;
}