refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-22 10:56:04 +08:00
parent ecd09059f5
commit 8c5ec3205c
1 changed files with 68 additions and 86 deletions

View File

@ -61,10 +61,10 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, SRow** pTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey,
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
@ -89,32 +89,26 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
STsdbRowKey k1 = {0}, k2 = {0};
if (pReader->pkComparFn == NULL) {
ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2));
return 0;
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p2 == NULL) {
return 1;
}
tsdbRowGetKey(p1, &k1);
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val);
}
static int32_t pkComp1(STsdbReader* pReader, SRowKey* p1, TSDBROW* p2) {
if (pReader->pkComparFn == NULL) {
ASSERT(p1->ts != TSDBROW_TS(p2));
return 0;
if (p1 == NULL) {
return -1;
}
SRowKey k2 = {0};
tRowGetKeyEx(p2, &k2);
return pReader->pkComparFn(&p1->pks[0].val, &k2.pks[0].val);
}
if (p1->ts < p2->ts) {
return -1;
} else if (p1->ts > p2->ts) {
return 1;
}
static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
if (p1->numOfPKs == 0) {
return 0;
} else {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
}
static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) {
@ -1519,7 +1513,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
SRowKey nextRowKey;
tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
if (pKey->ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, pKey, &nextRowKey) != 0)) { // merge is not needed
if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) { // merge is not needed
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) {
return code;
@ -1576,28 +1570,6 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p2 == NULL) {
return 1;
}
if (p1 == NULL) {
return -1;
}
if (p1->ts < p2->ts) {
return -1;
} else if (p1->ts > p2->ts) {
return 1;
}
if (p1->numOfPKs == 0) {
return 0;
} else {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader,
bool* copied) {
@ -2191,9 +2163,9 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
}
if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf
SRowKey nextRowKey;
SRowKey nextRowKey; // lazy eval
tColRowGetKey(pBlockData, rowIndex, &nextRowKey);
if (pkComp2(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
if (pkCompEx(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
return false;
}
}
@ -3615,7 +3587,9 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra
break;
}
if (pkComp1(pReader, pCurKey, pRow) != 0) {
SRowKey nextKey = {0};
tRowGetKeyEx(pRow, &nextKey);
if (pkCompEx(pReader->pkComparFn, pCurKey, &nextKey) != 0) {
break;
}
@ -3738,14 +3712,11 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
return TSDB_CODE_SUCCESS;
}
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
STsdbReader* pReader, bool* freeTSRow) {
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow;
SRowKey curKey = {0};
tRowGetKeyEx(&current, &curKey);
{ // if the timestamp of the next valid row has a different ts, return current row directly
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
@ -3761,7 +3732,15 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return TSDB_CODE_SUCCESS;
}
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow) || (pkComp1(pReader, &curKey, pNextRow) != 0)) {
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
*pResRow = current;
*freeTSRow = false;
return TSDB_CODE_SUCCESS;
}
SRowKey nextRowKey = {0};
tRowGetKeyEx(pNextRow, &nextRowKey);
if (pkCompEx(pReader->pkComparFn, pKey, &nextRowKey) != 0) {
*pResRow = current;
*freeTSRow = false;
return TSDB_CODE_SUCCESS;
@ -3799,7 +3778,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return code;
}
code = doMergeRowsInBuf(pIter, uid, &curKey, pDelList, pReader);
code = doMergeRowsInBuf(pIter, uid, pKey, pDelList, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3816,14 +3795,10 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return TSDB_CODE_SUCCESS;
}
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
SRow** pTSRow) {
int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey,
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) {
SRowMerger* pMerger = &pReader->status.merger;
SRowKey k, ik;
tRowGetKeyEx(pRow, &k);
tRowGetKeyEx(piRow, &ik);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
@ -3846,13 +3821,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3863,13 +3838,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3882,42 +3857,47 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow,
int64_t endKey, bool* freeTSRow) {
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
SArray* pDelList = pBlockScanInfo->delSkyline;
uint64_t uid = pBlockScanInfo->uid;
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
SArray* pDelList = pBlockScanInfo->delSkyline;
uint64_t uid = pBlockScanInfo->uid;
SIterInfo* piter = &pBlockScanInfo->iter;
SIterInfo* piiter = &pBlockScanInfo->iiter;
SRowKey rowKey = {0}, irowKey = {0};
// todo refactor
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
if (pBlockScanInfo->iter.hasVal) {
if (piter->hasVal) {
TSDBKEY k = TSDBROW_KEY(pRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
pRow = NULL;
}
}
if (pBlockScanInfo->iiter.hasVal) {
if (piiter->hasVal) {
TSDBKEY k = TSDBROW_KEY(piRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
piRow = NULL;
}
}
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
if (piter->hasVal && piiter->hasVal && pRow != NULL && piRow != NULL) {
tRowGetKeyEx(pRow, &rowKey);
tRowGetKeyEx(piRow, &irowKey);
int32_t code = TSDB_CODE_SUCCESS;
if (ik.ts != k.ts || (pkComp(pReader, pRow, piRow) != 0)) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
int32_t ret = pkCompEx(pReader->pkComparFn, &rowKey, &irowKey);
if (ret != 0) {
if ((ret > 0 && asc) || (ret < 0 && (!asc))) { // ik.ts < k.ts
code = doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow);
} else if ((ret < 0 && asc) || (ret > 0 && (!asc))) {
code = doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow);
}
} else { // ik.ts == k.ts
*freeTSRow = true;
pResRow->type = TSDBROW_ROW_FMT;
code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pResRow->pTSRow);
code = doMergeMemIMemRows(pRow, &rowKey, piRow, &irowKey, pBlockScanInfo, pReader, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3926,12 +3906,14 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
return code;
}
if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
return doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
if (piter->hasVal && pRow != NULL) {
tRowGetKeyEx(pRow, &rowKey);
return doMergeMemTableMultiRows(pRow, &rowKey, uid, piter, pDelList, pResRow, pReader, freeTSRow);
}
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
if (piiter->hasVal && piRow != NULL) {
tRowGetKeyEx(piRow, &irowKey);
return doMergeMemTableMultiRows(piRow, &irowKey, uid, piiter, pDelList, pResRow, pReader, freeTSRow);
}
return TSDB_CODE_SUCCESS;