fix(tsdb): support composite primary key.
This commit is contained in:
parent
34c849bdbe
commit
5ed0283494
|
@ -125,6 +125,9 @@ int32_t tsdbRowCompare(const void *p1, const void *p2);
|
|||
int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2);
|
||||
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
|
||||
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
|
||||
void tsdbColRowGetKey(SBlockData *pBlock, int32_t irow, STsdbRowKey *key);
|
||||
int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey *pSrc);
|
||||
|
||||
// STSDBRowIter
|
||||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||
void tsdbRowClose(STSDBRowIter *pIter);
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
#include "tsimplehash.h"
|
||||
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
|
||||
#define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey))
|
||||
|
||||
typedef struct {
|
||||
bool overlapWithNeighborBlock;
|
||||
|
@ -40,9 +40,9 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
|
|||
STsdbReader* pReader);
|
||||
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pKey, SArray* pDelList,
|
||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
||||
STsdbRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pCurKey, SArray* pDelList,
|
||||
STsdbReader* pReader);
|
||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
||||
STableBlockScanInfo* pScanInfo);
|
||||
|
@ -106,6 +106,10 @@ static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) {
|
|||
return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val);
|
||||
}
|
||||
|
||||
static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) {
|
||||
return pReader->pkComparFn(&p1->key.pks[0].val, &p2->key.pks[0].val);
|
||||
}
|
||||
|
||||
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
|
||||
int32_t numOfCols) {
|
||||
pSupInfo->smaValid = true;
|
||||
|
@ -195,6 +199,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
|
|||
pLReader->order = pReader->info.order;
|
||||
pLReader->window = pReader->info.window;
|
||||
pLReader->verRange = pReader->info.verRange;
|
||||
pLReader->numOfPks = pReader->numOfPks;
|
||||
|
||||
pLReader->uid = 0;
|
||||
tMergeTreeClose(&pLReader->mergeTree);
|
||||
|
@ -427,6 +432,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
pReader->type = pCond->type;
|
||||
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
||||
pReader->numOfPks = -1;
|
||||
pReader->pkChecked = false;
|
||||
|
||||
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -565,7 +572,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
|
||||
int32_t k = 0;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
STimeWindow w = pReader->info.window;
|
||||
SBrinRecord* pRecord = NULL;
|
||||
|
||||
|
@ -601,10 +609,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid);
|
||||
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
w.skey = pScanInfo->lastProcKey + step;
|
||||
|
||||
// todo: here we should find the first timestamp that is greater than the lastProcKey
|
||||
if (asc) {
|
||||
w.skey = pScanInfo->lastProcKey.key.ts + step;
|
||||
} else {
|
||||
w.ekey = pScanInfo->lastProcKey + step;
|
||||
w.ekey = pScanInfo->lastProcKey.key.ts + step;
|
||||
}
|
||||
|
||||
if (isEmptyQueryTimeWindow(&w)) {
|
||||
|
@ -672,10 +682,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo keep the the last returned key
|
||||
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
|
||||
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||
// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||
pDumpInfo->allDumped = true;
|
||||
pDumpInfo->lastKey = maxKey + step;
|
||||
// ASSERT(0);
|
||||
// pDumpInfo->lastKey.key.ts = maxKey + step;
|
||||
}
|
||||
|
||||
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
|
||||
|
@ -933,7 +945,7 @@ static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInf
|
|||
record->count = pBlockInfo->count;
|
||||
}
|
||||
|
||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLastProcKey) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||
|
@ -1021,7 +1033,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
|
||||
if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check
|
||||
dumpedRows = pReader->resBlockInfo.capacity;
|
||||
} else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly.
|
||||
} else if (dumpedRows <= 0) { // no qualified rows in current data block, quit directly.
|
||||
setBlockAllDumped(pDumpInfo, pReader->info.window.ekey, pReader->info.order);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1082,11 +1094,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
pResBlock->info.rows = dumpedRows;
|
||||
pDumpInfo->rowIndex += step * dumpedRows;
|
||||
|
||||
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey);
|
||||
|
||||
// check if current block are all handled
|
||||
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
|
||||
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
|
||||
// the remain data has out of query time window, ignore current block
|
||||
if (outOfTimeWindow(ts, &pReader->info.window)) {
|
||||
// the remain data has out of query time window, ignore current block
|
||||
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
|
||||
}
|
||||
} else {
|
||||
|
@ -1316,6 +1331,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
|
||||
SBrinRecord pRecord;
|
||||
blockInfoToRecord(&pRecord, pBlockInfo);
|
||||
|
||||
// has duplicated ts of different version in this block
|
||||
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order);
|
||||
|
@ -1399,10 +1415,19 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
|||
*copied = false;
|
||||
bool asc = (pReader->info.order == TSDB_ORDER_ASC);
|
||||
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
|
||||
int32_t step = pReader->info.order == TSDB_ORDER_ASC ? 1 : -1;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
|
||||
|
||||
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||
if (nextKey != key) { // merge is not needed
|
||||
STsdbRowKey rowKey, nextRowKey;
|
||||
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
|
||||
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
|
||||
|
||||
if (!pReader->pkChecked) {
|
||||
pReader->pkComparFn = getComparFunc(rowKey.key.pks[0].type, 0);
|
||||
pReader->numOfPks = rowKey.key.numOfPKs;
|
||||
pReader->pkChecked = true;
|
||||
}
|
||||
|
||||
if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed
|
||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -1424,6 +1449,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
|||
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
|
||||
if (!hasVal) { // the next value will be the accessed key in stt
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||
|
||||
// next file, the timestamps in the next file must be greater than those in current
|
||||
pScanInfo->sttKeyInfo.nextProcKey += step;
|
||||
return false;
|
||||
}
|
||||
|
@ -1432,8 +1459,27 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
|
|||
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
|
||||
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
|
||||
|
||||
pSttBlockReader->currentKey = key;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||
if (pSttBlockReader->numOfPks < 0) {// todo handle the deep copy problem
|
||||
tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
|
||||
pSttBlockReader->numOfPks = pSttBlockReader->currentKey.key.numOfPKs;
|
||||
if (pSttBlockReader->numOfPks > 0) {
|
||||
pSttBlockReader->pkComparFn = getComparFunc(pSttBlockReader->currentKey.key.pks[0].type, 0);
|
||||
}
|
||||
|
||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||
} else {
|
||||
if (pSttBlockReader->numOfPks == 0) {
|
||||
pSttBlockReader->currentKey.key.ts = key;
|
||||
pSttBlockReader->currentKey.version = ver;
|
||||
|
||||
// todo handle error
|
||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||
} else {
|
||||
// todo handle the deep copy problem
|
||||
tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey);
|
||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||
}
|
||||
}
|
||||
|
||||
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
|
||||
|
@ -1451,8 +1497,22 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl
|
|||
|
||||
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
|
||||
|
||||
static int32_t pkCompEx(__compar_fn_t comparFn, STsdbRowKey* p1, STsdbRowKey* p2) {
|
||||
if (p1->key.ts < p2->key.ts) {
|
||||
return -1;
|
||||
} else if (p1->key.ts > p2->key.ts) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (p1->key.numOfPKs == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return comparFn(p1->key.pks[0].pData, p2->key.pks[0].pData);
|
||||
}
|
||||
}
|
||||
|
||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
|
||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
||||
STableBlockScanInfo* pScanInfo, STsdbRowKey* pSttKey, STsdbReader* pReader,
|
||||
bool* copied) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*copied = false;
|
||||
|
@ -1462,18 +1522,18 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
|
|||
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
doUnpinSttBlock(pSttBlockReader);
|
||||
if (hasVal) {
|
||||
STsdbRowKey key, nextKey;
|
||||
tsdbRowGetKey(fRow, &key);
|
||||
STsdbRowKey nextKey;
|
||||
|
||||
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
tsdbRowGetKey(pNextRow, &nextKey);
|
||||
|
||||
if (!pReader->pkChecked) {
|
||||
pReader->pkComparFn = getComparFunc(key.key.pks[0].type, 0);
|
||||
pReader->pkComparFn = getComparFunc(pSttKey->key.pks[0].type, 0);
|
||||
pReader->pkChecked = true;
|
||||
pReader->numOfPks = pSttKey->key.numOfPKs;
|
||||
}
|
||||
|
||||
if (nextKey.key.ts != ts || (pkComp(pReader, fRow, pNextRow) != 0)) {
|
||||
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
|
||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -1534,15 +1594,31 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
SRow* pTSRow = NULL;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
__compar_fn_t compFn = pReader->pkComparFn;
|
||||
|
||||
int64_t tsLast = INT64_MIN;
|
||||
STsdbRowKey* pSttKey = NULL;
|
||||
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
}
|
||||
|
||||
STsdbRowKey k;
|
||||
tsdbRowGetKey(pRow, &k);
|
||||
|
||||
STSchema* pSchema = NULL;
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
if (pSchema == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
STsdbRowKey* pfKey = &(STsdbRowKey){0};
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
|
||||
} else {
|
||||
pfKey = NULL;
|
||||
}
|
||||
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||
|
@ -1554,65 +1630,53 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
int64_t minKey = 0;
|
||||
// int64_t minKey = 0;
|
||||
STsdbRowKey minKey;
|
||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||
minKey = INT64_MAX; // chosen the minimum value
|
||||
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||
minKey = tsLast;
|
||||
minKey = k; // chosen the minimum value
|
||||
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
|
||||
minKey = *pfKey;
|
||||
}
|
||||
|
||||
if (minKey > k.key.ts) {
|
||||
minKey = k.key.ts;
|
||||
}
|
||||
|
||||
if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
minKey = key;
|
||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) {
|
||||
minKey = *pSttKey;
|
||||
}
|
||||
} else {
|
||||
minKey = INT64_MIN;
|
||||
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||
minKey = tsLast;
|
||||
minKey = k;
|
||||
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
|
||||
minKey = *pfKey;
|
||||
}
|
||||
|
||||
if (minKey < k.key.ts) {
|
||||
minKey = k.key.ts;
|
||||
}
|
||||
|
||||
if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
minKey = key;
|
||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) {
|
||||
minKey = *pSttKey;
|
||||
}
|
||||
}
|
||||
|
||||
// ASC: file block ---> last block -----> imem -----> mem
|
||||
// DESC: mem -----> imem -----> last block -----> file block
|
||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||
if (minKey == key) {
|
||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
}
|
||||
|
||||
if (minKey == tsLast) {
|
||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == k.key.ts) {
|
||||
STSchema* pTSchema = NULL;
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
if (pTSchema == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1623,16 +1687,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (minKey == k.key.ts) {
|
||||
STSchema* pTSchema = NULL;
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
if (pTSchema == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1643,16 +1699,16 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
if (minKey == tsLast) {
|
||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -1691,11 +1747,12 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
|
||||
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
|
||||
bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo);
|
||||
|
||||
if (dataInDataFile && (!dataInSttFile)) {
|
||||
// no stt file block available, only data block exists
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if ((!dataInDataFile) && dataInSttFile) {
|
||||
// no data ile block exists
|
||||
// no data in data file exists
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
} else if (pBlockScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) {
|
||||
// opt model for count data in stt file, which is not overlap with data blocks in files.
|
||||
|
@ -1703,22 +1760,49 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
} else {
|
||||
// row in both stt file blocks and data file blocks
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
if (key < tsLast) { // asc
|
||||
if (key < pSttKey->key.ts) { // asc
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (key > tsLast) {
|
||||
} else if (key > pSttKey->key.ts) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
|
||||
// key == tsLast. ts is equal and the primary key exists
|
||||
if (pSttBlockReader->numOfPks > 0) {
|
||||
if (!pReader->pkChecked) {
|
||||
pReader->numOfPks = pSttBlockReader->numOfPks;
|
||||
pReader->pkComparFn = pSttBlockReader->pkComparFn;
|
||||
pReader->pkChecked = true;
|
||||
}
|
||||
|
||||
int32_t res = pkComp1(pReader, pSttKey, &fRow);
|
||||
if (res > 0) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (res < 0) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
}
|
||||
} else { // desc
|
||||
if (key > tsLast) {
|
||||
if (key > pSttKey->key.ts) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (key < tsLast) {
|
||||
} else if (key < pSttKey->key.ts) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
|
||||
// key == tsLast. ts is equal and the primary key exists
|
||||
if (pReader->numOfPks > 0) {
|
||||
int32_t res = pkComp1(pReader, pSttKey, &fRow);
|
||||
if (res < 0) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (res > 0) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// the following for key == tsLast
|
||||
// the following for key == sttKey->key.ts
|
||||
// ASC: file block ------> stt block
|
||||
// DESC: stt block ------> file block
|
||||
SRow* pTSRow = NULL;
|
||||
|
@ -1736,7 +1820,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
} else {
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
|
@ -1744,7 +1828,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1774,16 +1858,22 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||
__compar_fn_t compFn = pReader->pkComparFn;
|
||||
|
||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||
|
||||
int64_t tsLast = INT64_MIN;
|
||||
STsdbRowKey* pSttKey = NULL;
|
||||
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
}
|
||||
|
||||
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||
STsdbRowKey* pfKey = &(STsdbRowKey){0};
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey);
|
||||
} else {
|
||||
pfKey = NULL;
|
||||
}
|
||||
|
||||
STsdbRowKey k, ik;
|
||||
tsdbRowGetKey(pRow, &k);
|
||||
|
@ -1814,47 +1904,40 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
int64_t minKey = 0;
|
||||
STsdbRowKey minKey;
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
minKey = INT64_MAX; // let's find the minimum
|
||||
if (minKey > k.key.ts) {
|
||||
minKey = k.key.ts;
|
||||
minKey = k; // let's find the minimum
|
||||
|
||||
if (pkCompEx(compFn, &ik, &minKey) < 0) {//minKey > ik.key.ts) {
|
||||
minKey = ik;
|
||||
}
|
||||
|
||||
if (minKey > ik.key.ts) {
|
||||
minKey = ik.key.ts;
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) {
|
||||
minKey = *pfKey;
|
||||
}
|
||||
|
||||
if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
minKey = key;
|
||||
}
|
||||
|
||||
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||
minKey = tsLast;
|
||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) {
|
||||
minKey = *pSttKey;
|
||||
}
|
||||
} else {
|
||||
minKey = INT64_MIN; // let find the maximum ts value
|
||||
if (minKey < k.key.ts) {
|
||||
minKey = k.key.ts;
|
||||
minKey = k; // let find the maximum ts value
|
||||
if (pkCompEx(compFn, &ik, &minKey) > 0) {
|
||||
minKey = ik;
|
||||
}
|
||||
|
||||
if (minKey < ik.key.ts) {
|
||||
minKey = ik.key.ts;
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) {
|
||||
minKey = *pfKey;
|
||||
}
|
||||
|
||||
if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
minKey = key;
|
||||
}
|
||||
|
||||
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||
minKey = tsLast;
|
||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) {
|
||||
minKey = *pSttKey;
|
||||
}
|
||||
}
|
||||
|
||||
// ASC: file block -----> stt block -----> imem -----> mem
|
||||
// DESC: mem -----> imem -----> stt block -----> file block
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
if (minKey == key) {
|
||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1864,17 +1947,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
}
|
||||
|
||||
if (minKey == tsLast) {
|
||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == ik.key.ts) {
|
||||
if (pkCompEx(compFn, &minKey, &ik) == 0) {
|
||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -1886,7 +1969,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
if (minKey == k.key.ts) {
|
||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -1898,7 +1981,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (minKey == k.key.ts) {
|
||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -1910,7 +1993,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
if (minKey == ik.key.ts) {
|
||||
if (pkCompEx(compFn, &minKey, &ik) == 0) {
|
||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -1922,17 +2005,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
if (minKey == tsLast) {
|
||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1996,13 +2079,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
|||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
|
||||
.key = {
|
||||
.ts = pBlockScanInfo->lastProcKey + 1,
|
||||
.ts = pBlockScanInfo->lastProcKey.key.ts + 1,
|
||||
.numOfPKs = 0, // TODO: change here if multi-key is supported
|
||||
}};
|
||||
} else {
|
||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
|
||||
.key = {
|
||||
.ts = pBlockScanInfo->lastProcKey - 1,
|
||||
.ts = pBlockScanInfo->lastProcKey.key.ts - 1,
|
||||
.numOfPKs = 0, // TODO: change here if multi-key is supported
|
||||
}};
|
||||
}
|
||||
|
@ -2027,7 +2110,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
|||
}
|
||||
|
||||
static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STableBlockScanInfo* pBlockScanInfo, bool asc,
|
||||
STsdbReaderInfo* pInfo) {
|
||||
STsdbReaderInfo* pInfo, STsdbReader* pReader) {
|
||||
// it is an multi-table data block
|
||||
if (pBlockData->aUid != NULL) {
|
||||
uint64_t uid = pBlockData->aUid[rowIndex];
|
||||
|
@ -2047,10 +2130,25 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
|
|||
return false;
|
||||
}
|
||||
|
||||
if ((asc && (ts <= pBlockScanInfo->lastProcKey)) || ((!asc) && (ts >= pBlockScanInfo->lastProcKey))) {
|
||||
if ((asc && (ts < pBlockScanInfo->lastProcKey.key.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.key.ts))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf
|
||||
STsdbRowKey nextRowKey;
|
||||
tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey);
|
||||
|
||||
if (!pReader->pkChecked) {
|
||||
pReader->pkComparFn = getComparFunc(pBlockScanInfo->lastProcKey.key.pks[0].type, 0);
|
||||
pReader->pkChecked = true;
|
||||
pReader->numOfPks = nextRowKey.key.numOfPKs;
|
||||
}
|
||||
|
||||
if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
|
||||
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order,
|
||||
&pInfo->verRange);
|
||||
|
@ -2196,7 +2294,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
}
|
||||
|
||||
if (copied) {
|
||||
pBlockScanInfo->lastProcKey = key;
|
||||
if (pReader->numOfPks == 0) {
|
||||
pBlockScanInfo->lastProcKey.key.ts = key;
|
||||
} else { // todo use deep copy instead of shallow copy
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
|
||||
tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
@ -2223,9 +2326,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
|
||||
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||
STsdbReader* pReader) {
|
||||
bool copied = false;
|
||||
SRow* pTSRow = NULL;
|
||||
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
bool copied = false;
|
||||
SRow* pTSRow = NULL;
|
||||
STsdbRowKey* pSttKey = NULL;//getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
|
||||
STsdbRowKey newSttKey;
|
||||
tsdbRowKeyAssign(&newSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
|
||||
pSttKey = &newSttKey;
|
||||
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
|
@ -2234,13 +2341,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
|
|||
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
|
||||
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
|
||||
|
||||
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
|
||||
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, pSttKey, pReader, &copied);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (copied) {
|
||||
pBlockScanInfo->lastProcKey = tsLastBlock;
|
||||
tsdbRowKeyAssign(&pBlockScanInfo->lastProcKey, pSttKey);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
|
@ -2250,8 +2357,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
|
|||
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
||||
pReader->idStr);
|
||||
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2365,7 +2471,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
if (pBlockInfo == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -2375,7 +2480,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||
if (pBlockScanInfo == NULL) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -2384,18 +2489,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// it is a clean block, load it directly
|
||||
int64_t cap = pReader->resBlockInfo.capacity;
|
||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pBlockInfo->numRow <= cap)) {
|
||||
if (((asc && (pBlockInfo->firstKey < keyInBuf.ts)) || (!asc && (pBlockInfo->lastKey > keyInBuf.ts))) &&
|
||||
(pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
|
||||
code = copyBlockDataToSDataBlock(pReader);
|
||||
if (code) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
// record the last key value
|
||||
pBlockScanInfo->lastProcKey = asc ? pBlockInfo->lastKey : pBlockInfo->firstKey;
|
||||
goto _end;
|
||||
}
|
||||
bool directCopy = isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) &&
|
||||
(pBlockInfo->numRow <= cap) && (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) &&
|
||||
((asc && ((pBlockInfo->lastKey < keyInBuf.ts) || (keyInBuf.ts == INT64_MIN))) ||
|
||||
(!asc && (pBlockInfo->lastKey > keyInBuf.ts)));
|
||||
if (directCopy) {
|
||||
code = copyBlockDataToSDataBlock(pReader, &pBlockScanInfo->lastProcKey);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
|
@ -2406,7 +2506,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
{
|
||||
while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) {
|
||||
// find the first qualified row in data block
|
||||
if (isValidFileBlockRow(pBlockData, pDumpInfo->rowIndex, pBlockScanInfo, asc, &pReader->info)) {
|
||||
if (isValidFileBlockRow(pBlockData, pDumpInfo->rowIndex, pBlockScanInfo, asc, &pReader->info, pReader)) {
|
||||
hasBlockData = true;
|
||||
break;
|
||||
}
|
||||
|
@ -2699,7 +2799,8 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf
|
|||
|
||||
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||
pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
||||
ASSERT(0);
|
||||
pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
||||
pScanInfo->sttBlockReturned = true;
|
||||
|
||||
pSttBlockReader->mergeTree.pIter = NULL;
|
||||
|
@ -2725,7 +2826,18 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
|
|||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
||||
|
||||
// update the last key for the corresponding table
|
||||
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||
SRowKey* pKey = &pScanInfo->lastProcKey.key;
|
||||
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||
pKey->numOfPKs = pReader->numOfPks;
|
||||
|
||||
// todo opt allocation
|
||||
// if (IS_NUMERIC_TYPE(p)) {
|
||||
pKey->pks[0].val = asc? pBlockInfo->lastPrimaryKey.val:pBlockInfo->firstPrimaryKey.val;
|
||||
// } else {
|
||||
// int32_t len = asc? pBlockInfo->lastPKLen:pBlockInfo->firstPKLen;
|
||||
// char* p = taosMemoryRealloc(pKey->pks[0].pData, len);
|
||||
// }
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64
|
||||
" clean file block retrieved from file, global index:%d, "
|
||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||
|
@ -2931,10 +3043,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
||||
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
||||
STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
if ((pSttKey->key.ts >= pBlockInfo->firstKey && asc) || (pSttKey->key.ts <= pBlockInfo->lastKey && (!asc))) {
|
||||
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
||||
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
||||
pSttKey->key.ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -3046,20 +3158,22 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
|
|||
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
|
||||
|
||||
if (pBlockInfo) {
|
||||
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
if (pScanInfo) {
|
||||
lastKey = pScanInfo->lastProcKey;
|
||||
}
|
||||
// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
// if (pScanInfo) {
|
||||
// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey);
|
||||
// lastKey = pScanInfo->lastProcKey;
|
||||
// }
|
||||
|
||||
pDumpInfo->totalRows = pBlockInfo->numRow;
|
||||
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1;
|
||||
} else {
|
||||
pDumpInfo->totalRows = 0;
|
||||
pDumpInfo->rowIndex = 0;
|
||||
// pDumpInfo->lastKey.key.ts = lastKey;
|
||||
}
|
||||
|
||||
pDumpInfo->allDumped = false;
|
||||
pDumpInfo->lastKey = lastKey;
|
||||
// pDumpInfo->lastKey = lastKey;
|
||||
}
|
||||
|
||||
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||
|
@ -3173,9 +3287,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
|
||||
while (1) {
|
||||
if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded
|
||||
code = buildComposedDataBlock(pReader);
|
||||
} else {
|
||||
|
@ -3188,7 +3303,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
} else {
|
||||
// all data blocks in files are checked, let's check the data in last files.
|
||||
// data blocks in current file are exhausted, let's try the next file now
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
if (pBlockData->uid != 0) {
|
||||
tBlockDataClear(pBlockData);
|
||||
}
|
||||
|
@ -3553,14 +3667,19 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||
|
||||
|
||||
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbRowKey* pRowKey,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
||||
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
|
||||
int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
if (next1 == ts) {
|
||||
STsdbRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
|
||||
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1);
|
||||
if (ret == 0) {
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
} else {
|
||||
ASSERT(ret < 0);
|
||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
|
||||
pScanInfo->sttKeyInfo.nextProcKey, idStr);
|
||||
|
@ -3822,7 +3941,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
|||
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.rows += 1;
|
||||
pScanInfo->lastProcKey = pTSRow->ts;
|
||||
|
||||
// todo no version
|
||||
TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT};
|
||||
tsdbRowGetKey(&row, &pScanInfo->lastProcKey);
|
||||
// pScanInfo->lastProcKey = pTSRow->ts;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3899,13 +4022,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
|||
return code;
|
||||
}
|
||||
|
||||
pBlockScanInfo->lastProcKey = ts;
|
||||
tsdbRowGetKey(&row, &pBlockScanInfo->lastProcKey);
|
||||
} else {
|
||||
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
|
||||
if (code) {
|
||||
break;
|
||||
return code;
|
||||
}
|
||||
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
|
||||
tsdbColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey);
|
||||
}
|
||||
|
||||
// no data in buffer, return immediately
|
||||
|
@ -3959,11 +4082,13 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
|
|||
// todo extract method
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
int64_t skey = pReader->info.window.skey;
|
||||
pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
ASSERT(0);
|
||||
// pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pInfo->sttKeyInfo.nextProcKey = skey;
|
||||
} else {
|
||||
int64_t ekey = pReader->info.window.ekey;
|
||||
pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
ASSERT(0);
|
||||
// pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pInfo->sttKeyInfo.nextProcKey = ekey;
|
||||
}
|
||||
|
||||
|
@ -4279,7 +4404,8 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) {
|
|||
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
|
||||
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||
clearBlockScanInfo(pInfo);
|
||||
pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step;
|
||||
ASSERT(0);
|
||||
// pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step;
|
||||
}
|
||||
|
||||
pStatus->uidList.currentIndex = 0;
|
||||
|
@ -4798,7 +4924,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
code = copyBlockDataToSDataBlock(pReader);
|
||||
code = copyBlockDataToSDataBlock(pReader, &pBlockScanInfo->lastProcKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
terrno = code;
|
||||
|
|
|
@ -165,18 +165,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
|
||||
int64_t skey = pTsdbReader->info.window.skey;
|
||||
pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pScanInfo->lastProcKey.key.ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = skey;
|
||||
} else {
|
||||
int64_t ekey = pTsdbReader->info.window.ekey;
|
||||
pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pScanInfo->lastProcKey.key.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = ekey;
|
||||
}
|
||||
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
|
||||
pScanInfo->lastProcKey, pTsdbReader->idStr);
|
||||
pScanInfo->lastProcKey.key.ts, pTsdbReader->idStr);
|
||||
}
|
||||
|
||||
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
@ -209,7 +209,9 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
|
|||
}
|
||||
|
||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||
pInfo->lastProcKey = ts;
|
||||
pInfo->lastProcKey.key.ts = ts;
|
||||
ASSERT(0);
|
||||
|
||||
pInfo->sttKeyInfo.nextProcKey = ts + step;
|
||||
}
|
||||
}
|
||||
|
@ -355,7 +357,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
|
|||
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
|
||||
}
|
||||
|
||||
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
|
||||
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) {
|
||||
pBlockInfo->uid = record->uid;
|
||||
pBlockInfo->firstKey = record->firstKey.key.ts;
|
||||
pBlockInfo->lastKey = record->lastKey.key.ts;
|
||||
|
@ -368,6 +370,31 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor
|
|||
pBlockInfo->smaSize = record->smaSize;
|
||||
pBlockInfo->numRow = record->numRow;
|
||||
pBlockInfo->count = record->count;
|
||||
|
||||
SRowKey* pFirstKey = &record->firstKey.key;
|
||||
if (!pReader->pkChecked) {
|
||||
pReader->pkChecked = true;
|
||||
pReader->numOfPks = pFirstKey->numOfPKs;
|
||||
pReader->pkComparFn = getComparFunc(pFirstKey->pks[0].type, 0);
|
||||
}
|
||||
|
||||
if (pFirstKey->numOfPKs > 0) {
|
||||
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
|
||||
pBlockInfo->firstPrimaryKey.val = pFirstKey->pks[0].val;
|
||||
pBlockInfo->lastPrimaryKey.val = record->lastKey.key.pks[0].val;
|
||||
|
||||
pBlockInfo->firstPKLen = 0;
|
||||
pBlockInfo->lastPKLen = 0;
|
||||
} else { // todo handle memory alloc error, opt memory alloc perf
|
||||
pBlockInfo->firstPKLen = pFirstKey->pks[0].nData;
|
||||
pBlockInfo->firstPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen);
|
||||
memcpy(pBlockInfo->firstPrimaryKey.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen);
|
||||
|
||||
pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData;
|
||||
pBlockInfo->lastPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen);
|
||||
memcpy(pBlockInfo->lastPrimaryKey.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
|
||||
|
@ -392,7 +419,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
|
||||
// ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
|
||||
|
||||
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
|
||||
sup.numOfBlocksPerTable[sup.numOfTables] = num;
|
||||
|
@ -426,15 +452,17 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
if (pTableScanInfo->pBlockIdxList == NULL) {
|
||||
pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SFileDataBlockInfo blockInfo = {.tbBlockIdx = i};
|
||||
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
||||
recordToBlockInfo(&blockInfo, record);
|
||||
recordToBlockInfo(&blockInfo, record, pReader);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
|
||||
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
|
||||
}
|
||||
|
||||
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
|
@ -464,7 +492,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
|
||||
SFileDataBlockInfo blockInfo = {.tbBlockIdx = index};
|
||||
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||
recordToBlockInfo(&blockInfo, record);
|
||||
recordToBlockInfo(&blockInfo, record, pReader);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
|
||||
|
|
|
@ -77,7 +77,7 @@ typedef enum ESttKeyStatus {
|
|||
|
||||
typedef struct SSttKeyInfo {
|
||||
ESttKeyStatus status; // this value should be updated when switch to the next fileset
|
||||
int64_t nextProcKey;
|
||||
int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey value
|
||||
} SSttKeyInfo;
|
||||
|
||||
// clean stt file blocks:
|
||||
|
@ -87,7 +87,8 @@ typedef struct SSttKeyInfo {
|
|||
// 4. not overlap with data file blocks
|
||||
typedef struct STableBlockScanInfo {
|
||||
uint64_t uid;
|
||||
TSKEY lastProcKey; // todo: refactor: add primary key
|
||||
// TSKEY lastProcKey; // todo: refactor: add primary key
|
||||
STsdbRowKey lastProcKey;
|
||||
SSttKeyInfo sttKeyInfo;
|
||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
|
||||
|
@ -168,7 +169,9 @@ typedef struct SSttBlockReader {
|
|||
int32_t order;
|
||||
uint64_t uid;
|
||||
SMergeTree mergeTree;
|
||||
int64_t currentKey;
|
||||
STsdbRowKey currentKey;
|
||||
int32_t numOfPks;
|
||||
__compar_fn_t pkComparFn;
|
||||
} SSttBlockReader;
|
||||
|
||||
typedef struct SFilesetIter {
|
||||
|
@ -181,12 +184,21 @@ typedef struct SFilesetIter {
|
|||
|
||||
typedef struct SFileDataBlockInfo {
|
||||
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
|
||||
// int64_t suid;
|
||||
int64_t uid;
|
||||
int64_t firstKey;
|
||||
// int64_t firstKeyVer;
|
||||
union {
|
||||
int64_t val;
|
||||
uint8_t* pData;
|
||||
} firstPrimaryKey;
|
||||
|
||||
int64_t lastKey;
|
||||
// int64_t lastKeyVer;
|
||||
union {
|
||||
int64_t val;
|
||||
uint8_t* pData;
|
||||
} lastPrimaryKey;
|
||||
|
||||
int32_t firstPKLen;
|
||||
int32_t lastPKLen;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
int64_t blockOffset;
|
||||
|
@ -211,7 +223,8 @@ typedef struct SDataBlockIter {
|
|||
typedef struct SFileBlockDumpInfo {
|
||||
int32_t totalRows;
|
||||
int32_t rowIndex;
|
||||
int64_t lastKey;
|
||||
// int64_t lastKey;
|
||||
// STsdbRowKey lastKey; // this key should be removed
|
||||
bool allDumped;
|
||||
} SFileBlockDumpInfo;
|
||||
|
||||
|
@ -249,7 +262,6 @@ struct STsdbReader {
|
|||
TdThreadMutex readerMutex;
|
||||
EReaderStatus flag;
|
||||
int32_t code;
|
||||
uint64_t rowsNum;
|
||||
SResultBlockInfo resBlockInfo;
|
||||
SReaderStatus status;
|
||||
char* idStr; // query info handle, for debug purpose
|
||||
|
@ -268,6 +280,7 @@ struct STsdbReader {
|
|||
TsdReaderNotifyCbFn notifyFn;
|
||||
void* notifyParam;
|
||||
__compar_fn_t pkComparFn;
|
||||
int32_t numOfPks;
|
||||
bool pkChecked;
|
||||
};
|
||||
|
||||
|
|
|
@ -623,6 +623,54 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
|
|||
}
|
||||
}
|
||||
|
||||
void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) {
|
||||
key->version = pBlock->aVersion[irow];
|
||||
key->key.ts = pBlock->aTSKEY[irow];
|
||||
key->key.numOfPKs = 0;
|
||||
|
||||
for (int32_t i = 0; i < pBlock->nColData; i++) {
|
||||
SColData *pColData = &pBlock->aColData[i];
|
||||
if (pColData->cflag & COL_IS_KEY) {
|
||||
SColVal cv;
|
||||
tColDataGetValue(pColData, irow, &cv);
|
||||
ASSERT(COL_VAL_IS_VALUE(&cv));
|
||||
key->key.pks[key->key.numOfPKs] = cv.value;
|
||||
key->key.numOfPKs++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey* pSrc) {
|
||||
pDst->version = pSrc->version;
|
||||
|
||||
if (pSrc->key.numOfPKs == 0) {
|
||||
pDst->key.ts = pSrc->key.ts;
|
||||
pDst->key.numOfPKs = 0;
|
||||
} else {
|
||||
pDst->key = pSrc->key;
|
||||
|
||||
for (int32_t i = 0; i < pDst->key.numOfPKs; ++i) {
|
||||
SValue *pVal = &pDst->key.pks[i];
|
||||
if (IS_NUMERIC_TYPE(pVal->type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
uint8_t *p = taosMemoryMalloc(pVal->nData);
|
||||
if (p == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
memcpy(p, pVal->pData, pVal->nData);
|
||||
pVal->pData = p;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) {
|
||||
int32_t c = tRowKeyCompare(&key1->key, &key2->key);
|
||||
|
||||
|
@ -726,7 +774,6 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
|||
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
// goto _exit;
|
||||
}
|
||||
|
||||
// other
|
||||
|
|
Loading…
Reference in New Issue