refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-25 17:57:59 +08:00
parent a5ba546bd4
commit 5c729cc836
5 changed files with 117 additions and 40 deletions

View File

@ -836,6 +836,8 @@ struct SLDataIter {
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
SRowKey startRowKey; // current row key
__compar_fn_t comparFn;
bool ignoreEarlierTs;
struct SSttFileReader *pReader;
};
@ -846,7 +848,7 @@ struct SSttFileReader;
typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pSttFileReader,
SSttBlockLoadInfo *pLoadInfo);
typedef struct {
typedef struct SMergeTreeConf {
int8_t backward;
STsdb *pTsdb;
uint64_t suid;
@ -859,7 +861,9 @@ typedef struct {
STSchema *pSchema;
int16_t *pCols;
int32_t numOfCols;
SRowKey *pCurRowKey;
_load_tomb_fn loadTombFn;
__compar_fn_t comparFn;
void *pReader;
void *idstr;
bool rspRows; // response the rows in stt-file, if possible

View File

@ -479,6 +479,9 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
pIter->verRange.maxVer = pConf->verRange.maxVer;
pIter->timeWindow.skey = pConf->timewindow.skey;
pIter->timeWindow.ekey = pConf->timewindow.ekey;
pIter->comparFn = pConf->comparFn;
tRowKeyAssign(&pIter->startRowKey, pConf->pCurRowKey);
pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
@ -618,17 +621,39 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
}
int64_t ts = pData->aTSKEY[i];
if (!pIter->backward) { // asc
if (!pIter->backward) { // asc
if (ts > pIter->timeWindow.ekey) { // no more data
break;
} else if (ts < pIter->timeWindow.skey) {
continue;
} else {
if (ts < pIter->timeWindow.skey) {
continue;
}
if (ts == pIter->timeWindow.skey && pIter->startRowKey.numOfPKs > 0) {
SRowKey key;
tColRowGetKey(pData, i, &key);
int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey);
if (ret < 0) {
continue;
}
}
}
} else {
if (ts < pIter->timeWindow.skey) {
break;
} else if (ts > pIter->timeWindow.ekey) {
continue;
} else {
if (ts > pIter->timeWindow.ekey) {
continue;
}
if (ts == pIter->timeWindow.ekey && pIter->startRowKey.numOfPKs > 0) {
SRowKey key;
tColRowGetKey(pData, i, &key);
int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey);
if (ret > 0) {
continue;
}
}
}
}
@ -802,8 +827,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
STimeWindow w = {0};
int64_t numOfRows = 0;
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows,
pMTree->idStr);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -89,7 +89,7 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p2 == NULL) {
return 1;
}
@ -118,12 +118,13 @@ static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slot
return;
}
pKey->numOfPKs = 1;
SColData* pColData = &pBlock->aColData[slotId];
SColVal cv;
tColDataGetValue(pColData, irow, &cv);
pKey->numOfPKs = 1;
pKey->pks[0].type = cv.value.type;
if (IS_NUMERIC_TYPE(cv.value.type)) {
pKey->pks[0].val = cv.value.val;
} else {
@ -1394,7 +1395,7 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanI
int64_t key = 0;
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
key = ascScan ? TMIN(pBlock->firstKey, keyInStt) : TMAX(pBlock->lastKey, keyInStt);
} else {
key = ascScan ? pBlock->firstKey : pBlock->lastKey;
@ -1437,9 +1438,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order);
// todo handle the primary key overlap case
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
pInfo->overlapWithSttBlock = !(pBlockInfo->lastKey < nextProcKeyInStt || pBlockInfo->firstKey > nextProcKeyInStt);
}
@ -1536,8 +1538,9 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
SVersionRange* pVerRange) {
int32_t order = pSttBlockReader->order;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
int32_t order = pSttBlockReader->order;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey;
while (1) {
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
@ -1545,7 +1548,14 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
// next file, the timestamps in the next file must be greater than those in current
pScanInfo->sttKeyInfo.nextProcKey += step;
pNextProc->ts += step;
if (pSttBlockReader->numOfPks > 0) {
if (IS_NUMERIC_TYPE(pNextProc->pks[0].type)) {
pNextProc->pks[0].val = INT64_MIN;
} else {
memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData);
}
}
return false;
}
@ -1559,7 +1569,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, &pSttBlockReader->currentKey);
}
pScanInfo->sttKeyInfo.nextProcKey = key;
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc);
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
@ -2042,7 +2053,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* d = NULL;
STbData* di = NULL;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
bool forward = true;
STsdbReadSnap* pSnap = pReader->pReadSnap;
STimeWindow* pWindow = &pReader->info.window;
if (pBlockScanInfo->iterInit) {
return TSDB_CODE_SUCCESS;
@ -2051,6 +2064,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STsdbRowKey startKey = {0};
tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey);
startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer;
if ((asc && (startKey.key.ts < pWindow->skey)) || ((!asc) && startKey.key.ts > pWindow->ekey)) {
startKey.key.ts = asc? pWindow->skey:pWindow->ekey;
forward = false;
}
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem");
if (code != TSDB_CODE_SUCCESS) {
@ -2063,7 +2080,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
}
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
forwardDataIter(&startKey.key, pBlockScanInfo, pReader);
if (forward) {
forwardDataIter(&startKey.key, pBlockScanInfo, pReader);
}
pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS;
@ -2115,6 +2135,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool hasData = true;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
// the stt block reader has been initialized for this table.
if (pSttBlockReader->uid == pScanInfo->uid) {
@ -2133,10 +2154,10 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
}
STimeWindow w = pSttBlockReader->window;
if (ASCENDING_TRAVERSE(pSttBlockReader->order)) {
w.skey = pScanInfo->sttKeyInfo.nextProcKey;
if (asc) {
w.skey = pScanInfo->sttKeyInfo.nextProcKey.ts;
} else {
w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
w.ekey = pScanInfo->sttKeyInfo.nextProcKey.ts;
}
int64_t st = taosGetTimestampUs();
@ -2157,6 +2178,8 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
.pCols = pReader->suppInfo.colId,
.numOfCols = pReader->suppInfo.numOfCols,
.loadTombFn = loadSttTombDataForAll,
.pCurRowKey = &pScanInfo->sttKeyInfo.nextProcKey,
.comparFn = pReader->pkComparFn,
.pReader = pReader,
.idstr = pReader->idStr,
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
@ -2193,8 +2216,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
}
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
// todo set the primary key value
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
@ -2755,7 +2779,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
setComposedBlockFlag(pReader, true);
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
@ -2915,7 +2939,7 @@ static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanI
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
return true;
} else {
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
return (asc && pBlockInfo->lastKey < keyInStt) || (!asc && pBlockInfo->firstKey > keyInStt);
}
}
@ -2963,7 +2987,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) {
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey.ts;
if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) ||
(!asc && pBlockInfo->firstKey > keyInStt)) {
@ -3649,7 +3673,7 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
} else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
pScanInfo->sttKeyInfo.nextProcKey, idStr);
pScanInfo->sttKeyInfo.nextProcKey.ts, idStr);
break;
}
}

View File

@ -130,25 +130,46 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c
return *p;
}
static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len) {
pKey->numOfPKs = numOfPks;
pKey->ts = ts;
if (numOfPks > 0) {
pKey->pks[0].type = type;
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
pKey->pks[0].val = INT64_MIN;
} else {
pKey->pks[0].pData = taosMemoryCalloc(1, len);
pKey->pks[0].nData = 0;
if (pKey->pks[0].pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
}
}
return TSDB_CODE_SUCCESS;
}
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
int32_t numOfPks = pReader->suppInfo.numOfPks;
SRowKey* pRowKey = &pScanInfo->lastProcKey;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
int64_t skey = pReader->info.window.skey;
pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->sttKeyInfo.nextProcKey = skey;
int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes);
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type,
pReader->suppInfo.pk.bytes);
} else {
int64_t ekey = pReader->info.window.ekey;
pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->sttKeyInfo.nextProcKey = ekey;
}
int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
// only handle the first primary key.
pRowKey->numOfPKs = pReader->suppInfo.numOfPks;
if (pReader->suppInfo.numOfPks > 0) {
if (IS_VAR_DATA_TYPE(pReader->suppInfo.pk.type)) {
pRowKey->pks[0].pData = taosMemoryCalloc(1, pReader->suppInfo.pk.bytes);
}
pRowKey->pks[0].type = pReader->suppInfo.pk.type;
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes);
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type,
pReader->suppInfo.pk.bytes);
}
}
@ -230,7 +251,7 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastProcKey.ts = ts;
// todo check the nextProcKey info
pInfo->sttKeyInfo.nextProcKey = ts + step;
pInfo->sttKeyInfo.nextProcKey.ts = ts + step;
}
}

View File

@ -77,7 +77,9 @@ typedef enum ESttKeyStatus {
typedef struct SSttKeyInfo {
ESttKeyStatus status; // this value should be updated when switch to the next fileset
int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey value
SRowKey nextProcKey;
// int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey
// value
} SSttKeyInfo;
// clean stt file blocks:
@ -333,6 +335,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
const char* pstr);
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
typedef struct {
SArray* pTombData;