fix(tsdb): extract tomb data in stt files.
This commit is contained in:
parent
f49f96049e
commit
cf02ac7229
|
@ -706,6 +706,7 @@ typedef struct SSttBlockLoadInfo {
|
|||
void *pBlockArray;
|
||||
|
||||
SArray *aSttBlk;
|
||||
SArray *pTombBlockArray; // tomb block array list
|
||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||
int32_t currentLoadBlockIndex;
|
||||
int32_t loadBlocks;
|
||||
|
|
|
@ -269,6 +269,83 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid) {
|
||||
TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray;
|
||||
if (TARRAY2_SIZE(pArray) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSttBlk *pStart = &pArray->data[0];
|
||||
SSttBlk *pEnd = &pArray->data[TARRAY2_SIZE(pArray) - 1];
|
||||
|
||||
// all identical
|
||||
if (pStart->suid == pEnd->suid) {
|
||||
if (pStart->suid != suid) { // no qualified stt block existed
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
pIter->iSttBlk = -1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // all blocks are qualified
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
|
||||
}
|
||||
} else {
|
||||
SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
|
||||
for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
|
||||
SSttBlk* p = &pArray->data[i];
|
||||
if (p->suid < suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->suid == suid) {
|
||||
taosArrayPush(pTmp, p);
|
||||
} else if (p->suid > suid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlockLoadInfo->aSttBlk);
|
||||
pBlockLoadInfo->aSttBlk = pTmp;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t suid, SSttBlockLoadInfo* pLoadInfo) {
|
||||
if (pLoadInfo->pTombBlockArray == NULL) {
|
||||
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
const TTombBlkArray* pBlkArray = NULL;
|
||||
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
for(int32_t j = 0; j < pBlkArray->size; ++j) {
|
||||
STombBlk* pTombBlk = &pBlkArray->data[j];
|
||||
if (pTombBlk->maxTbid.suid < suid) {
|
||||
continue; // todo use binary search instead
|
||||
}
|
||||
|
||||
if (pTombBlk->minTbid.suid > suid) {
|
||||
break;
|
||||
}
|
||||
|
||||
STombBlock* pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock));
|
||||
code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock);
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
const char *idStr, bool strictTimeRange) {
|
||||
|
@ -290,51 +367,15 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32
|
|||
pBlockLoadInfo->sttBlockLoaded = true;
|
||||
|
||||
code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray);
|
||||
if (code) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// only apply to the child tables, ordinary tables will not incur this filter procedure.
|
||||
TSttBlkArray* pArray = pBlockLoadInfo->pBlockArray;
|
||||
size_t size = pArray->size;
|
||||
code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid);
|
||||
|
||||
if (size >= 1) {
|
||||
SSttBlk *pStart = &pArray->data[0];
|
||||
SSttBlk *pEnd = &pArray->data[size - 1];
|
||||
|
||||
// all identical
|
||||
if (pStart->suid == pEnd->suid) {
|
||||
if (pStart->suid != suid) {
|
||||
// no qualified stt block existed
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
|
||||
pIter->iSttBlk = -1;
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||
return code;
|
||||
} else { // all blocks are qualified
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
|
||||
}
|
||||
} else {
|
||||
SArray *pTmp = taosArrayInit(size, sizeof(SSttBlk));
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSttBlk* p = &pArray->data[i];
|
||||
uint64_t s = p->suid;
|
||||
if (s < suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (s == suid) {
|
||||
taosArrayPush(pTmp, p);
|
||||
} else if (s > suid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlockLoadInfo->aSttBlk);
|
||||
pBlockLoadInfo->aSttBlk = pTmp;
|
||||
}
|
||||
code = loadSttTombBlockData(pReader, suid, pBlockLoadInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
|
|
@ -2766,6 +2766,51 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
|||
return true;
|
||||
}
|
||||
|
||||
static int32_t loadTomRecordInfoFromSttFiles(SSttBlockLoadInfo* pBlockLoadInfo, uint64_t suid,
|
||||
STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) {
|
||||
int32_t size = taosArrayGetSize(pBlockLoadInfo->pTombBlockArray);
|
||||
if (size <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uint64_t uid = pBlockScanInfo->uid;
|
||||
if (pBlockScanInfo->pDelData == NULL) {
|
||||
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STombBlock* pBlock = taosArrayGetP(pBlockLoadInfo->pTombBlockArray, i);
|
||||
|
||||
STombRecord record = {0};
|
||||
for(int32_t j = 0; j < pBlock->suid->size; ++j) {
|
||||
int32_t code = tTombBlockGet(pBlock, j, &record);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
if (record.suid < suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo use binary search instead here
|
||||
if (record.uid < uid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (record.uid > uid) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (record.version <= maxVer) {
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
taosArrayPush(pBlockScanInfo->pDelData, &delData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||
// the last block reader has been initialized for this table.
|
||||
if (pLBlockReader->uid == pScanInfo->uid) {
|
||||
|
@ -2776,7 +2821,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
tMergeTreeClose(&pLBlockReader->mergeTree);
|
||||
}
|
||||
|
||||
initMemDataIterator(pScanInfo, pReader);
|
||||
pLBlockReader->uid = pScanInfo->uid;
|
||||
|
||||
STimeWindow w = pLBlockReader->window;
|
||||
|
@ -2788,14 +2832,20 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
|
||||
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
|
||||
pScanInfo->uid, pReader->idStr);
|
||||
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
|
||||
pReader->pTsdb, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
|
||||
pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter,
|
||||
pReader->status.pCurrentFileset);
|
||||
int32_t code =
|
||||
tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb,
|
||||
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false,
|
||||
pReader->idStr, false, pReader->status.pLDataIter, pReader->status.pCurrentFileset);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
code = loadTomRecordInfoFromSttFiles(pLBlockReader->pInfo, pReader->suid, pScanInfo, pReader->verRange.maxVer);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
initMemDataIterator(pScanInfo, pReader);
|
||||
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
|
||||
}
|
||||
|
||||
|
@ -3212,8 +3262,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3];
|
||||
if (pTombFileObj!= NULL) {
|
||||
const TTombBlkArray* pBlkArray = NULL;
|
||||
|
||||
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
|
||||
// todo find the correct start position.
|
||||
|
|
|
@ -27,7 +27,7 @@ class TDTestCase:
|
|||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self.dbname = 'db_test'
|
||||
self.ns_dbname = 'ns_test'
|
||||
self.us_dbname = 'us_test'
|
||||
|
|
Loading…
Reference in New Issue