fix(tsdb):add attributes for ssdatablock.

This commit is contained in:
Haojun Liao 2024-03-20 15:29:05 +08:00
parent 78b0d3fa74
commit 55f7bc2c9c
2 changed files with 13 additions and 30 deletions

View File

@ -213,6 +213,7 @@ typedef struct SDataBlockInfo {
int16_t dataLoad; // denote if the data is loaded or not int16_t dataLoad; // denote if the data is loaded or not
uint8_t scanFlag; uint8_t scanFlag;
bool blankFill; bool blankFill;
SValue pks[2];
// TODO: optimize and remove following // TODO: optimize and remove following
int64_t version; // used for stream, and need serialization int64_t version; // used for stream, and need serialization

View File

@ -85,7 +85,6 @@ static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
if (pReader->pkComparFn == NULL) { if (pReader->pkComparFn == NULL) {
ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2)); ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2));
ASSERT(pReader->pkChecked);
return 0; return 0;
} }
@ -97,7 +96,6 @@ static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) { static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) {
if (pReader->pkComparFn == NULL) { if (pReader->pkComparFn == NULL) {
ASSERT(p1->key.ts != TSDBROW_TS(p2)); ASSERT(p1->key.ts != TSDBROW_TS(p2));
ASSERT(pReader->pkChecked);
return 0; return 0;
} }
@ -1421,12 +1419,6 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey); tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey); tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
if (!pReader->pkChecked) {
pReader->pkComparFn = getComparFunc(rowKey.key.pks[0].type, 0);
pReader->numOfPks = rowKey.key.numOfPKs;
pReader->pkChecked = true;
}
if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) { if (code) {
@ -1771,12 +1763,6 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
// key == tsLast. ts is equal and the primary key exists // key == tsLast. ts is equal and the primary key exists
if (pSttBlockReader->numOfPks > 0) { if (pSttBlockReader->numOfPks > 0) {
if (!pReader->pkChecked) {
pReader->numOfPks = pSttBlockReader->numOfPks;
pReader->pkComparFn = pSttBlockReader->pkComparFn;
pReader->pkChecked = true;
}
int32_t res = pkComp1(pReader, pSttKey, &fRow); int32_t res = pkComp1(pReader, pSttKey, &fRow);
if (res > 0) { if (res > 0) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
@ -2137,13 +2123,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf
STsdbRowKey nextRowKey; STsdbRowKey nextRowKey;
tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey); tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey);
if (!pReader->pkChecked) {
pReader->pkComparFn = getComparFunc(pBlockScanInfo->lastProcKey.key.pks[0].type, 0);
pReader->pkChecked = true;
pReader->numOfPks = nextRowKey.key.numOfPKs;
}
if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) { if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
return false; return false;
} }
@ -2243,7 +2222,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttKeyInfo.nextProcKey = pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true; hasData = true;
} else { // not clean stt blocks } else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
pScanInfo->sttBlockReturned = false; pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
@ -2453,9 +2432,12 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1; pResBlock->info.dataLoad = 1;
pResBlock->info.version = pReader->info.verRange.maxVer; pResBlock->info.version = pReader->info.verRange.maxVer;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
// todo update the pk range for current return data block
pReader->cost.composedBlocks += 1; pReader->cost.composedBlocks += 1;
pReader->cost.buildComposedBlockTime += el; pReader->cost.buildComposedBlockTime += el;
} }
@ -2799,6 +2781,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
ASSERT(0); ASSERT(0);
pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
pScanInfo->sttBlockReturned = true; pScanInfo->sttBlockReturned = true;
@ -2812,7 +2795,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo,
SFileDataBlockInfo* pBlockInfo, int32_t blockIndex) { SFileDataBlockInfo* pBlockInfo, int32_t blockIndex) {
// whole block is required, return it directly // whole data block is required, return it directly
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
@ -2822,6 +2805,7 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
pInfo->dataLoad = 0; pInfo->dataLoad = 0;
pInfo->version = pReader->info.verRange.maxVer; pInfo->version = pReader->info.verRange.maxVer;
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
setComposedBlockFlag(pReader, false); setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
@ -2830,13 +2814,11 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = pReader->numOfPks; pKey->numOfPKs = pReader->numOfPks;
// todo opt allocation // todo opt allocation, and handle varchar primary key
// if (IS_NUMERIC_TYPE(p)) { pKey->pks[0].val = asc ? pBlockInfo->lastPrimaryKey.val : pBlockInfo->firstPrimaryKey.val;
pKey->pks[0].val = asc? pBlockInfo->lastPrimaryKey.val:pBlockInfo->firstPrimaryKey.val;
// } else { pInfo->pks[0].val = pBlockInfo->firstPrimaryKey.val;
// int32_t len = asc? pBlockInfo->lastPKLen:pBlockInfo->firstPKLen; pInfo->pks[1].val = pBlockInfo->lastPrimaryKey.val;
// char* p = taosMemoryRealloc(pKey->pks[0].pData, len);
// }
tsdbDebug("%p uid:%" PRIu64 tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, " " clean file block retrieved from file, global index:%d, "