refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-21 15:08:23 +08:00
parent 0c578ab6c0
commit fd8f065c67
6 changed files with 127 additions and 145 deletions

View File

@ -125,8 +125,8 @@ int32_t tsdbRowCompare(const void *p1, const void *p2);
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2);
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
void tsdbColRowGetKey(SBlockData *pBlock, int32_t irow, STsdbRowKey *key);
int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey *pSrc);
void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc);
// STSDBRowIter
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);

View File

@ -419,7 +419,6 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code;
}
#if 0
// load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
TStatisBlkArray *pStatisBlkArray = NULL;
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
@ -434,7 +433,6 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
return code;
}
#endif
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
@ -817,7 +815,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
tMergeTreeAddIter(pMTree, pIter);
// let's record the time window for current table of uid in the stt files
if (pSttDataInfo != NULL) {
if (pSttDataInfo != NULL && numOfRows > 0) {
taosArrayPush(pSttDataInfo->pTimeWindowList, &w);
pSttDataInfo->numOfRows += numOfRows;
}

View File

@ -25,6 +25,15 @@
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey))
#define tRowGetKeyEx(_pRow, _pKey) \
do { \
if ((_pRow)->type == TSDBROW_ROW_FMT) { \
tRowGetKey((_pRow)->pTSRow, (_pKey)); \
} else { \
tColRowGetKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \
} \
} while (0)
typedef struct {
bool overlapWithNeighborBlock;
bool hasDupTs;
@ -41,8 +50,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
STsdbRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pCurKey, SArray* pDelList,
SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pScanInfo);
@ -93,19 +102,19 @@ static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val);
}
static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) {
static int32_t pkComp1(STsdbReader* pReader, SRowKey* p1, TSDBROW* p2) {
if (pReader->pkComparFn == NULL) {
ASSERT(p1->key.ts != TSDBROW_TS(p2));
ASSERT(p1->ts != TSDBROW_TS(p2));
return 0;
}
STsdbRowKey k2 = {0};
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val);
SRowKey k2 = {0};
tRowGetKeyEx(p2, &k2);
return pReader->pkComparFn(&p1->pks[0].val, &k2.pks[0].val);
}
static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) {
return pReader->pkComparFn(&p1->key.pks[0].val, &p2->key.pks[0].val);
static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
@ -136,6 +145,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
if (pCols[i].pk) {
pSupInfo->pk = pCols[i];
pSupInfo->pk.slotId = pSlotIdList[i];
pSupInfo->numOfPks += 1;
}
}
@ -207,6 +217,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
pLReader->window = pReader->info.window;
pLReader->verRange = pReader->info.verRange;
pLReader->numOfPks = pReader->suppInfo.numOfPks;
pLReader->pkComparFn = pReader->pkComparFn;
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
@ -431,13 +442,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->info.suid = pCond->suid;
pReader->info.order = pCond->order;
pReader->info.verRange = getQueryVerRange(pVnode, pCond, level);
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->info.verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
@ -621,9 +634,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
// todo: here we should find the first timestamp that is greater than the lastProcKey
if (asc) {
w.skey = pScanInfo->lastProcKey.key.ts + step;
w.skey = pScanInfo->lastProcKey.ts + step;
} else {
w.ekey = pScanInfo->lastProcKey.key.ts + step;
w.ekey = pScanInfo->lastProcKey.ts + step;
}
if (isEmptyQueryTimeWindow(&w)) {
@ -954,7 +967,7 @@ static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInf
record->count = pBlockInfo->count;
}
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLastProcKey) {
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastProcKey) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
@ -1103,7 +1116,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLas
pResBlock->info.rows = dumpedRows;
pDumpInfo->rowIndex += step * dumpedRows;
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey);
// check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
@ -1426,11 +1439,11 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
STsdbRowKey rowKey, nextRowKey;
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
SRowKey rowKey, nextRowKey;
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed
if (rowKey.ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, &rowKey, &nextRowKey) != 0)) { // merge is not needed
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) {
return code;
@ -1462,27 +1475,14 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
if (pSttBlockReader->numOfPks < 0) {// todo handle the deep copy problem
tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
pSttBlockReader->numOfPks = pSttBlockReader->currentKey.key.numOfPKs;
if (pSttBlockReader->numOfPks > 0) {
pSttBlockReader->pkComparFn = getComparFunc(pSttBlockReader->currentKey.key.pks[0].type, 0);
}
pScanInfo->sttKeyInfo.nextProcKey = key;
} else {
if (pSttBlockReader->numOfPks == 0) {
pSttBlockReader->currentKey.key.ts = key;
pSttBlockReader->currentKey.version = ver;
pSttBlockReader->currentKey.ts = key;
// todo handle error
pScanInfo->sttKeyInfo.nextProcKey = key;
} else {
// todo handle the deep copy problem
tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
} else { // todo handle the deep copy problem
tColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
pScanInfo->sttKeyInfo.nextProcKey = key;
}
}
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
@ -1500,22 +1500,22 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
static int32_t pkCompEx(__compar_fn_t comparFn, STsdbRowKey* p1, STsdbRowKey* p2) {
if (p1->key.ts < p2->key.ts) {
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p1->ts < p2->ts) {
return -1;
} else if (p1->key.ts > p2->key.ts) {
} else if (p1->ts > p2->ts) {
return 1;
}
if (p1->key.numOfPKs == 0) {
if (p1->numOfPKs == 0) {
return 0;
} else {
return comparFn(&p1->key.pks[0].val, &p2->key.pks[0].val);
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
STableBlockScanInfo* pScanInfo, STsdbRowKey* pSttKey, STsdbReader* pReader,
STableBlockScanInfo* pScanInfo, SRowKey* pSttKey, STsdbReader* pReader,
bool* copied) {
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
@ -1525,10 +1525,10 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pSttBlockReader);
if (hasVal) {
STsdbRowKey nextKey;
SRowKey nextKey;
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowGetKey(pNextRow, &nextKey);
tRowGetKeyEx(pNextRow, &nextKey);
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
@ -1586,20 +1586,20 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
}
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SSttBlockReader* pSttBlockReader) {
SIterInfo* pIter, SSttBlockReader* pSttBlockReader) {
SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
__compar_fn_t compFn = pReader->pkComparFn;
STsdbRowKey* pSttKey = NULL;
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
}
STsdbRowKey k;
tsdbRowGetKey(pRow, &k);
SRowKey k;
tRowGetKeyEx(pRow, &k);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -1609,9 +1609,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
STsdbRowKey* pfKey = &(STsdbRowKey){0};
SRowKey* pfKey = &(SRowKey){0};
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
} else {
pfKey = NULL;
}
@ -1627,8 +1627,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
// int64_t minKey = 0;
STsdbRowKey minKey;
SRowKey minKey;
if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = k; // chosen the minimum value
@ -1757,12 +1756,12 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
} else {
// row in both stt file blocks and data file blocks
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < pSttKey->key.ts) { // asc
if (key < pSttKey->ts) { // asc
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key > pSttKey->key.ts) {
} else if (key > pSttKey->ts) {
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
}
@ -1776,9 +1775,9 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
}
}
} else { // desc
if (key > pSttKey->key.ts) {
if (key > pSttKey->ts) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < pSttKey->key.ts) {
} else if (key < pSttKey->ts) {
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
}
@ -1854,21 +1853,21 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
STsdbRowKey* pSttKey = NULL;
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
}
STsdbRowKey* pfKey = &(STsdbRowKey){0};
SRowKey* pfKey = &(SRowKey){0};
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
} else {
pfKey = NULL;
}
STsdbRowKey k, ik;
tsdbRowGetKey(pRow, &k);
tsdbRowGetKey(piRow, &ik);
SRowKey k, ik;
tRowGetKeyEx(pRow, &k);
tRowGetKeyEx(piRow, &ik);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -1895,7 +1894,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
STsdbRowKey minKey;
SRowKey minKey;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
minKey = k; // let's find the minimum
@ -2070,13 +2069,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
if (ASCENDING_TRAVERSE(pReader->info.order)) {
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
.key = {
.ts = pBlockScanInfo->lastProcKey.key.ts + 1,
.ts = pBlockScanInfo->lastProcKey.ts + 1,
.numOfPKs = 0, // TODO: change here if multi-key is supported
}};
} else {
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
.key = {
.ts = pBlockScanInfo->lastProcKey.key.ts - 1,
.ts = pBlockScanInfo->lastProcKey.ts - 1,
.numOfPKs = 0, // TODO: change here if multi-key is supported
}};
}
@ -2121,14 +2120,14 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
return false;
}
if ((asc && (ts < pBlockScanInfo->lastProcKey.key.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.key.ts))) {
if ((asc && (ts < pBlockScanInfo->lastProcKey.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.ts))) {
return false;
}
if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf
STsdbRowKey nextRowKey;
tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey);
if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
if (ts == pBlockScanInfo->lastProcKey.ts) { // todo opt perf
SRowKey nextRowKey;
tColRowGetKey(pBlockData, rowIndex, &nextRowKey);
if (pkComp2(pReader->pkComparFn, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
return false;
}
}
@ -2202,7 +2201,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (0 /*conf.rspRows*/) {
if (conf.rspRows) {
pScanInfo->cleanSttBlocks =
isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order);
@ -2226,7 +2225,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true;
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
pScanInfo->sttBlockReturned = false;
@ -2279,10 +2278,10 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
if (copied) {
if (pReader->suppInfo.numOfPks == 0) {
pBlockScanInfo->lastProcKey.key.ts = key;
pBlockScanInfo->lastProcKey.ts = key;
} else { // todo use deep copy instead of shallow copy
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
}
return TSDB_CODE_SUCCESS;
} else {
@ -2312,11 +2311,9 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
STsdbReader* pReader) {
bool copied = false;
SRow* pTSRow = NULL;
STsdbRowKey* pSttKey = NULL;//getCurrentKeyInSttBlock(pSttBlockReader);
SRowKey sttKey = {0};
STsdbRowKey newSttKey;
tsdbRowKeyAssign(&newSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
pSttKey = &newSttKey;
tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader));
SRowMerger* pMerger = &pReader->status.merger;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
@ -2325,13 +2322,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, pSttKey, pReader, &copied);
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, &sttKey, pReader, &copied);
if (code) {
return code;
}
if (copied) {
tsdbRowKeyAssign(&pBlockScanInfo->lastProcKey, pSttKey);
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -2341,7 +2338,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
@ -2379,12 +2376,12 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
// imem + file + stt block
if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, pSttBlockReader);
}
// mem + file + stt block
if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, pSttBlockReader);
}
// files data blocks + stt block
@ -2789,7 +2786,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
ASSERT(0);
pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
pScanInfo->sttBlockReturned = true;
pSttBlockReader->mergeTree.pIter = NULL;
@ -2829,7 +2826,7 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
// update the last key for the corresponding table
SRowKey* pKey = &pScanInfo->lastProcKey.key;
SRowKey* pKey = &pScanInfo->lastProcKey;
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = pReader->suppInfo.numOfPks;
@ -3041,10 +3038,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
// data in stt now overlaps with current active file data block, need to composed with file data block.
STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
if ((pSttKey->key.ts >= pBlockInfo->firstKey && asc) || (pSttKey->key.ts <= pBlockInfo->lastKey && (!asc))) {
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
if ((pSttKey->ts >= pBlockInfo->firstKey && asc) || (pSttKey->ts <= pBlockInfo->lastKey && (!asc))) {
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
pSttKey->key.ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
pSttKey->ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
break;
}
}
@ -3539,7 +3536,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
}
}
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) {
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) {
SRowMerger* pMerger = &pReader->status.merger;
while (1) {
@ -3555,7 +3552,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, S
}
// ts is not identical, quit
if (TSDBROW_TS(pRow) != pCurKey->key.ts) {
if (TSDBROW_TS(pRow) != pCurKey->ts) {
break;
}
@ -3659,10 +3656,10 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbRowKey* pRowKey,
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
STsdbRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader);
SRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader);
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1);
if (ret == 0) {
@ -3685,8 +3682,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow;
STsdbRowKey curKey = {0};
tsdbRowGetKey(&current, &curKey);
SRowKey curKey = {0};
tRowGetKeyEx(&current, &curKey);
{ // if the timestamp of the next valid row has a different ts, return current row directly
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
@ -3762,9 +3759,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
SRow** pTSRow) {
SRowMerger* pMerger = &pReader->status.merger;
STsdbRowKey k, ik;
tsdbRowGetKey(pRow, &k);
tsdbRowGetKey(piRow, &ik);
SRowKey k, ik;
tRowGetKeyEx(pRow, &k);
tRowGetKeyEx(piRow, &ik);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -3934,7 +3931,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
// todo no version
TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT};
tsdbRowGetKey(&row, &pScanInfo->lastProcKey);
tRowGetKeyEx(&row, &pScanInfo->lastProcKey);
// pScanInfo->lastProcKey = pTSRow->ts;
return TSDB_CODE_SUCCESS;
}
@ -4001,7 +3998,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
if (row.type == TSDBROW_ROW_FMT) {
int64_t ts = row.pTSRow->ts;
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) {
@ -4012,13 +4008,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return code;
}
tsdbRowGetKey(&row, &pBlockScanInfo->lastProcKey);
tRowGetKey(row.pTSRow, &pBlockScanInfo->lastProcKey);
} else {
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) {
return code;
}
tsdbColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey);
tColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey);
}
// no data in buffer, return immediately

View File

@ -165,18 +165,23 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastProcKey.key.ts = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastProcKey.ts = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->sttKeyInfo.nextProcKey = skey;
} else {
int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastProcKey.key.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastProcKey.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->sttKeyInfo.nextProcKey = ekey;
}
pScanInfo->lastProcKey.numOfPKs = pTsdbReader->suppInfo.numOfPks;
if (pTsdbReader->suppInfo.numOfPks > 0 && IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) {
pScanInfo->lastProcKey.pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes);
}
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastProcKey.key.ts, pTsdbReader->idStr);
pScanInfo->lastProcKey.ts, pTsdbReader->idStr);
}
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
@ -209,7 +214,7 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastProcKey.key.ts = ts;
pInfo->lastProcKey.ts = ts;
ASSERT(0);
pInfo->sttKeyInfo.nextProcKey = ts + step;

View File

@ -87,8 +87,7 @@ typedef struct SSttKeyInfo {
// 4. not overlap with data file blocks
typedef struct STableBlockScanInfo {
uint64_t uid;
// TSKEY lastProcKey; // todo: refactor: add primary key
STsdbRowKey lastProcKey;
SRowKey lastProcKey;
SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
@ -171,7 +170,7 @@ typedef struct SSttBlockReader {
int32_t order;
uint64_t uid;
SMergeTree mergeTree;
STsdbRowKey currentKey;
SRowKey currentKey;
int32_t numOfPks;
__compar_fn_t pkComparFn;
} SSttBlockReader;

View File

@ -606,27 +606,13 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
tRowGetKey(row->pTSRow, &key->key);
} else {
key->version = row->pBlockData->aVersion[row->iRow];
key->key.ts = row->pBlockData->aTSKEY[row->iRow];
key->key.numOfPKs = 0;
for (int32_t i = 0; i < row->pBlockData->nColData; i++) {
SColData *pColData = &row->pBlockData->aColData[i];
if (pColData->cflag & COL_IS_KEY) {
SColVal cv;
tColDataGetValue(pColData, row->iRow, &cv);
ASSERT(COL_VAL_IS_VALUE(&cv));
key->key.pks[key->key.numOfPKs] = cv.value;
key->key.numOfPKs++;
} else {
break;
}
}
tColRowGetKey(row->pBlockData, row->iRow, &key->key);
}
}
void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) {
key->version = pBlock->aVersion[irow];
key->key.ts = pBlock->aTSKEY[irow];
key->key.numOfPKs = 0;
void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) {
key->ts = pBlock->aTSKEY[irow];
key->numOfPKs = 0;
for (int32_t i = 0; i < pBlock->nColData; i++) {
SColData *pColData = &pBlock->aColData[i];
@ -634,25 +620,23 @@ void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) {
SColVal cv;
tColDataGetValue(pColData, irow, &cv);
ASSERT(COL_VAL_IS_VALUE(&cv));
key->key.pks[key->key.numOfPKs] = cv.value;
key->key.numOfPKs++;
key->pks[key->numOfPKs] = cv.value;
key->numOfPKs++;
} else {
break;
}
}
}
int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey* pSrc) {
pDst->version = pSrc->version;
if (pSrc->key.numOfPKs == 0) {
pDst->key.ts = pSrc->key.ts;
pDst->key.numOfPKs = 0;
int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
if (pSrc->numOfPKs == 0) {
pDst->ts = pSrc->ts;
pDst->numOfPKs = 0;
} else {
pDst->key = pSrc->key;
*pDst = *pSrc;
for (int32_t i = 0; i < pDst->key.numOfPKs; ++i) {
SValue *pVal = &pDst->key.pks[i];
for (int32_t i = 0; i < pDst->numOfPKs; ++i) {
SValue *pVal = &pDst->pks[i];
if (IS_NUMERIC_TYPE(pVal->type)) {
continue;
}