fix(tsdb): add key for lastProcessKey, and remove invalid assign for key

This commit is contained in:
Haojun Liao 2024-04-05 19:47:59 +08:00
parent 15f2882bba
commit dc954e112e
6 changed files with 180 additions and 132 deletions

View File

@ -1306,8 +1306,8 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) {
if (IS_NUMERIC_TYPE(pVal->type)) {
pVal->val = pSrc->pks[i].val;
} else {
memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData);
pVal->nData = pSrc->pks[i].nData;
memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData);
}
}
}

View File

@ -758,9 +758,11 @@ typedef struct SBlockDataInfo {
// todo: move away
typedef struct {
SArray *pUid;
SArray *pFirstTs;
SArray *pLastTs;
SArray *pCount;
SArray *pFirstKey;
SArray *pLastKey;
SArray *pCount;
} SSttTableRowsInfo;
typedef struct SSttBlockLoadInfo {
@ -836,7 +838,7 @@ struct SLDataIter {
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
SRowKey startRowKey; // current row key
SRowKey* pStartRowKey; // current row key
__compar_fn_t comparFn;
bool ignoreEarlierTs;
struct SSttFileReader *pReader;
@ -870,7 +872,7 @@ typedef struct SMergeTreeConf {
} SMergeTreeConf;
typedef struct SSttDataInfoForTable {
SArray *pTimeWindowList;
SArray *pKeyRangeList;
int64_t numOfRows;
} SSttDataInfoForTable;

View File

@ -359,18 +359,51 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
if (i < rows) {
if (pBlockLoadInfo->info.pUid == NULL) {
pBlockLoadInfo->info.pUid = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pFirstTs = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pLastTs = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pCount = taosArrayInit(rows, sizeof(int64_t));
pBlockLoadInfo->info.pFirstKey = taosArrayInit(rows, sizeof(SValue));
pBlockLoadInfo->info.pLastKey = taosArrayInit(rows, sizeof(SValue));
}
if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey,
taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs,
tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pLastKey,
tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)),
rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
SValue vFirst = {0}, vLast = {0};
for (int32_t f = i; f < rows; ++f) {
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
if (code) {
break;
}
if (IS_VAR_DATA_TYPE(vFirst.type)) {
char *p = (char *)vFirst.pData;
char *pBuf = taosMemoryMalloc(vFirst.nData);
memcpy(pBuf, p, vFirst.nData);
vFirst.pData = (uint8_t *)pBuf;
}
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
if (code) {
break;
}
if (IS_VAR_DATA_TYPE(vLast.type)) {
char *p = (char *)vLast.pData;
char *pBuf = taosMemoryMalloc(vLast.nData);
memcpy(pBuf, p, vLast.nData);
vLast.pData = (uint8_t *)pBuf;
}
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
}
} else {
STbStatisRecord record;
while (i < rows) {
@ -380,9 +413,13 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
}
taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.ts);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.ts);
taosArrayPush(pBlockLoadInfo->info.pCount, &record.count);
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]);
i += 1;
}
}
@ -452,23 +489,26 @@ static int32_t uidComparFn(const void *p1, const void *p2) {
}
}
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, STimeWindow *pTimeWindow,
static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid, SSttKeyRange *pRange,
int64_t *numOfRows) {
if (pTimeWindow == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
if (pRange == NULL || taosArrayGetSize(pLoadInfo->info.pUid) == 0) {
return;
}
int32_t index = taosArraySearchIdx(pLoadInfo->info.pUid, &uid, uidComparFn, TD_EQ);
if (index >= 0) {
pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index);
pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index);
pRange->skey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstTs, index);
pRange->ekey.ts = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastTs, index);
*numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index);
memcpy(&pRange->skey.pks[0], taosArrayGet(pLoadInfo->info.pFirstKey, index), sizeof(SValue));
memcpy(&pRange->ekey.pks[0], taosArrayGet(pLoadInfo->info.pLastKey, index), sizeof(SValue));
}
}
int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t cid, int8_t backward,
SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow,
SMergeTreeConf *pConf, SSttBlockLoadInfo *pBlockLoadInfo, SSttKeyRange *pKeyRange,
int64_t *numOfRows, const char *idStr) {
int32_t code = TSDB_CODE_SUCCESS;
@ -481,7 +521,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
pIter->timeWindow.ekey = pConf->timewindow.ekey;
pIter->comparFn = pConf->comparFn;
tRowKeyAssign(&pIter->startRowKey, pConf->pCurRowKey);
pIter->pStartRowKey = pConf->pCurRowKey;
pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
@ -500,7 +540,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
}
}
setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pTimeWindow, numOfRows);
setSttInfoForCurrentTable(pBlockLoadInfo, pConf->uid, pKeyRange, numOfRows);
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
// the skey is updated.
@ -629,10 +669,10 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
continue;
}
if (ts == pIter->timeWindow.skey && pIter->startRowKey.numOfPKs > 0) {
if (ts == pIter->timeWindow.skey && pIter->pStartRowKey->numOfPKs > 0) {
SRowKey key;
tColRowGetKey(pData, i, &key);
int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey);
int32_t ret = pkCompEx(pIter->comparFn, &key, pIter->pStartRowKey);
if (ret < 0) {
continue;
}
@ -646,10 +686,10 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
continue;
}
if (ts == pIter->timeWindow.ekey && pIter->startRowKey.numOfPKs > 0) {
if (ts == pIter->timeWindow.ekey && pIter->pStartRowKey->numOfPKs > 0) {
SRowKey key;
tColRowGetKey(pData, i, &key);
int32_t ret = pkCompEx(pIter->comparFn, &key, &pIter->startRowKey);
int32_t ret = pkCompEx(pIter->comparFn, &key, pIter->pStartRowKey);
if (ret > 0) {
continue;
}
@ -825,11 +865,11 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
memset(pIter, 0, sizeof(SLDataIter));
STimeWindow w = {0};
int64_t numOfRows = 0;
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
SSttKeyRange range = {.skey.numOfPKs = pConf->pCurRowKey->numOfPKs, .ekey.numOfPKs = pConf->pCurRowKey->numOfPKs};
int64_t numOfRows = 0;
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows,
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &range, &numOfRows,
pMTree->idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
@ -841,7 +881,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
// let's record the time window for current table of uid in the stt files
if (pSttDataInfo != NULL && numOfRows > 0) {
taosArrayPush(pSttDataInfo->pTimeWindowList, &w);
taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
pSttDataInfo->numOfRows += numOfRows;
}
} else {

View File

@ -49,8 +49,8 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
@ -265,9 +265,11 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
// init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetArray, STsdbReader* pReader) {
size_t numOfFileset = TARRAY2_SIZE(pFileSetArray);
SBlockLoadSuppInfo* pInfo = &pReader->suppInfo;
size_t numOfFileset = TARRAY2_SIZE(pFileSetArray);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
pIter->index = ASCENDING_TRAVERSE(pReader->info.order) ? -1 : numOfFileset;
pIter->index = asc ? -1 : numOfFileset;
pIter->order = pReader->info.order;
pIter->pFilesetList = pFileSetArray;
pIter->numOfFiles = numOfFileset;
@ -281,15 +283,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
}
}
SSttBlockReader* pLReader = pIter->pSttBlockReader;
pLReader->order = pReader->info.order;
pLReader->window = pReader->info.window;
pLReader->verRange = pReader->info.verRange;
pLReader->numOfPks = pReader->suppInfo.numOfPks;
pLReader->pkComparFn = pReader->pkComparFn;
SSttBlockReader* pSttReader = pIter->pSttBlockReader;
pSttReader->order = pReader->info.order;
pSttReader->window = pReader->info.window;
pSttReader->verRange = pReader->info.verRange;
pSttReader->numOfPks = pReader->suppInfo.numOfPks;
pSttReader->pkComparFn = pReader->pkComparFn;
pSttReader->uid = 0;
tMergeTreeClose(&pSttReader->mergeTree);
initRowKey(&pSttReader->currentKey, INT64_MIN, pInfo->numOfPks, pInfo->pk.type, pInfo->pk.bytes, asc);
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
@ -1676,7 +1680,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
SRowKey* pSttKey = &(SRowKey){0};
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
} else {
pSttKey = NULL;
}
@ -1753,8 +1757,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange,
pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
}
if (pkCompEx(compFn, &minKey, &k) == 0) {
@ -1851,8 +1854,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
}
// pSttKey will be changed when sttBlockReader iterates to the next row, so use pKey instead.
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pKey, pMerger, pkSrcSlot, &pReader->info.verRange,
pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
@ -1966,15 +1968,14 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
}
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
if (pkCompEx(compFn, &minKey, &pBlockScanInfo->lastProcKey) == 0) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange,
pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
}
if (pkCompEx(compFn, &minKey, &ik) == 0) {
@ -2155,8 +2156,9 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
}
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool hasData = true;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
bool hasData = true;
int32_t order = pReader->info.order;
bool asc = ASCENDING_TRAVERSE(order);
// the stt block reader has been initialized for this table.
if (pSttBlockReader->uid == pScanInfo->uid) {
@ -2206,7 +2208,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
.rspRows = (pReader->info.execMode == READER_EXEC_ROWS),
};
SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))};
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) {
return false;
@ -2216,44 +2218,41 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (conf.rspRows) {
pScanInfo->cleanSttBlocks =
isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order);
pScanInfo->cleanSttBlocks = isCleanSttBlock(info.pKeyRangeList, &pReader->info.window, pScanInfo, order);
if (pScanInfo->cleanSttBlocks) {
pScanInfo->numOfRowsInStt = info.numOfRows;
pScanInfo->sttWindow.skey = INT64_MAX;
pScanInfo->sttWindow.ekey = INT64_MIN;
// calculate the time window for data in stt files
for (int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) {
STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i);
if (pScanInfo->sttWindow.skey > pWindow->skey) {
pScanInfo->sttWindow.skey = pWindow->skey;
for (int32_t i = 0; i < taosArrayGetSize(info.pKeyRangeList); ++i) {
SSttKeyRange* pKeyRange = taosArrayGet(info.pKeyRangeList, i);
if (pkCompEx(pReader->pkComparFn, &pScanInfo->sttRange.skey, &pKeyRange->skey) > 0) {
tRowKeyAssign(&pScanInfo->sttRange.skey, &pKeyRange->skey);
}
if (pScanInfo->sttWindow.ekey < pWindow->ekey) {
pScanInfo->sttWindow.ekey = pWindow->ekey;
if (pkCompEx(pReader->pkComparFn, &pScanInfo->sttRange.ekey, &pKeyRange->ekey) < 0) {
tRowKeyAssign(&pScanInfo->sttRange.ekey, &pKeyRange->ekey);
}
}
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pKeyRangeList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
SRowKey* p = asc? &pScanInfo->sttRange.skey:&pScanInfo->sttRange.ekey;
tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p);
// todo set the primary key value
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
INIT_KEYRANGE(&pScanInfo->sttRange); //reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
}
} else {
pScanInfo->cleanSttBlocks = false;
INIT_TIMEWINDOW(&pScanInfo->sttWindow); // reset the time window
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
}
taosArrayDestroy(info.pTimeWindowList);
taosArrayDestroy(info.pKeyRangeList);
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initSttBlockReader += (el / 1000.0);
@ -2318,29 +2317,26 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
}
}
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) {
bool copied = false;
SRow* pTSRow = NULL;
SRowKey sttKey = {0};
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
tRowKeyAssign(&sttKey, getCurrentKeyInSttBlock(pSttBlockReader));
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool copied = false;
SRow* pTSRow = NULL;
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
SRowMerger* pMerger = &pReader->status.merger;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
// let's record the last processed key
tRowKeyAssign(&pScanInfo->lastProcKey, getCurrentKeyInSttBlock(pSttBlockReader));
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, &sttKey, pReader, &copied);
int32_t code =
tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pScanInfo, &pScanInfo->lastProcKey, pReader, &copied);
if (code) {
return code;
}
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
if (copied) {
return TSDB_CODE_SUCCESS;
} else {
@ -2351,14 +2347,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, &sttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
@ -2796,23 +2791,18 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
pInfo->rows = pScanInfo->numOfRowsInStt;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 1;
pInfo->window = pScanInfo->sttWindow;
pInfo->window.skey = pScanInfo->sttRange.skey.ts;
pInfo->window.ekey = pScanInfo->sttRange.ekey.ts;
setComposedBlockFlag(pReader, true);
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
pScanInfo->sttKeyInfo.nextProcKey.ts = asc ? pScanInfo->sttRange.ekey.ts + 1 : pScanInfo->sttRange.skey.ts - 1;
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->lastProcKey.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
if (pScanInfo->lastProcKey.numOfPKs > 0) {
ASSERT(0);
// if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
// pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
// } else {
// uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
// pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen;
// memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
// }
if (asc) {
tRowKeyAssign(&pScanInfo->lastProcKey, &pScanInfo->sttRange.ekey);
} else {
tRowKeyAssign(&pScanInfo->lastProcKey, &pScanInfo->sttRange.skey);
}
pScanInfo->sttBlockReturned = true;
@ -3014,18 +3004,18 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(!asc && pBlockInfo->firstKey > keyInStt)) {
if (pScanInfo->cleanSttBlocks && hasDataInSttBlock(pScanInfo)) {
if (asc) { // file block is located before the stt block
ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey);
ASSERT(pScanInfo->sttRange.skey.ts > pBlockInfo->lastKey);
} else { // stt block is before the file block
ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey);
ASSERT(pScanInfo->sttRange.ekey.ts < pBlockInfo->firstKey);
}
}
buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index);
} else { // clean stt block
if (asc) {
ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey);
ASSERT(pScanInfo->sttRange.ekey.ts < pBlockInfo->firstKey);
} else {
ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey);
ASSERT(pScanInfo->sttRange.skey.ts > pBlockInfo->lastKey);
}
// return the stt file block
@ -3682,8 +3672,10 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS;
}
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) {
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowMerger* pMerger,
int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) {
SRowKey* pRowKey = &pScanInfo->lastProcKey;
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) {
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);

View File

@ -130,7 +130,7 @@ STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, c
return *p;
}
static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) {
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) {
pKey->numOfPKs = numOfPks;
pKey->ts = ts;
@ -169,30 +169,33 @@ static int32_t initSRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
int32_t numOfPks = pReader->suppInfo.numOfPks;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int8_t type = pReader->suppInfo.pk.type;
int8_t bytes = pReader->suppInfo.pk.bytes;
SRowKey* pRowKey = &pScanInfo->lastProcKey;
if (asc) {
int64_t skey = pReader->info.window.skey;
int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc);
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, pReader->suppInfo.pk.type,
pReader->suppInfo.pk.bytes, asc);
initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc);
} else {
int64_t ekey = pReader->info.window.ekey;
int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
initSRowKey(pRowKey, ts, numOfPks, pReader->suppInfo.pk.type, pReader->suppInfo.pk.bytes, asc);
initSRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, pReader->suppInfo.pk.type,
pReader->suppInfo.pk.bytes, asc);
initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc);
}
initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc);
initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc);
}
int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap,
STsdbReader* pReader) {
pScanInfo->uid = uid;
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
INIT_KEYRANGE(&pScanInfo->sttRange);
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
pScanInfo->cleanSttBlocks = false;
@ -311,7 +314,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
pScanInfo->cleanSttBlocks = false;
pScanInfo->numOfRowsInStt = 0;
pScanInfo->sttBlockReturned = false;
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
INIT_KEYRANGE(&pScanInfo->sttRange);
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
}

View File

@ -32,6 +32,12 @@ extern "C" {
(_w)->ekey = INT64_MIN; \
} while (0);
#define INIT_KEYRANGE(_k) \
do { \
(_k)->skey.ts = INT64_MAX; \
(_k)->ekey.ts = INT64_MIN; \
} while (0);
typedef enum {
READER_STATUS_SUSPEND = 0x1,
READER_STATUS_NORMAL = 0x2,
@ -78,34 +84,38 @@ typedef enum ESttKeyStatus {
typedef struct SSttKeyInfo {
ESttKeyStatus status; // this value should be updated when switch to the next fileset
SRowKey nextProcKey;
// int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey
// value
} SSttKeyInfo;
typedef struct SSttKeyRange {
SRowKey skey;
SRowKey ekey;
} SSttKeyRange;
// clean stt file blocks:
// 1. not overlap with stt blocks in other stt files of the same fileset
// 2. not overlap with delete skyline
// 3. not overlap with in-memory data (mem/imem)
// 4. not overlap with data file blocks
typedef struct STableBlockScanInfo {
uint64_t uid;
SRowKey lastProcKey;
SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
bool cleanSttBlocks; // stt block is clean in current fileset
bool sttBlockReturned; // result block returned alreay
int64_t numOfRowsInStt;
STimeWindow sttWindow; // timestamp window for current stt files
STimeWindow filesetWindow; // timestamp window for current file set
uint64_t uid;
SRowKey lastProcKey;
SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
bool cleanSttBlocks; // stt block is clean in current fileset
bool sttBlockReturned; // result block returned alreay
int64_t numOfRowsInStt;
SSttKeyRange sttRange;
// STimeWindow sttWindow; // timestamp window for current stt files
STimeWindow filesetWindow; // timestamp window for current file set
} STableBlockScanInfo;
typedef struct SResultBlockInfo {
@ -336,6 +346,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order);
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
typedef struct {
SArray* pTombData;