add version on some structures
This commit is contained in:
parent
80d2d94c32
commit
32ee762924
|
@ -258,6 +258,11 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
|
|||
// tsdbReadImpl
|
||||
typedef struct SReadH SReadH;
|
||||
|
||||
struct TSDBKEY {
|
||||
int64_t version;
|
||||
TSKEY ts;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
uint32_t len;
|
||||
uint32_t offset;
|
||||
|
@ -265,25 +270,9 @@ typedef struct {
|
|||
uint32_t numOfBlocks : 30;
|
||||
uint64_t suid;
|
||||
uint64_t uid;
|
||||
TSKEY maxKey;
|
||||
TSDBKEY maxKey;
|
||||
} SBlockIdx;
|
||||
|
||||
#ifdef TD_REFACTOR_3
|
||||
typedef struct {
|
||||
int64_t last : 1;
|
||||
int64_t offset : 63;
|
||||
int32_t algorithm : 8;
|
||||
int32_t numOfRows : 24;
|
||||
int32_t len;
|
||||
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
||||
int16_t numOfSubBlocks;
|
||||
int16_t numOfCols; // not including timestamp column
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
} SBlock;
|
||||
|
||||
#else
|
||||
|
||||
typedef enum {
|
||||
TSDB_SBLK_VER_0 = 0,
|
||||
TSDB_SBLK_VER_MAX,
|
||||
|
@ -306,17 +295,15 @@ typedef struct {
|
|||
int64_t offset;
|
||||
uint64_t aggrStat : 1;
|
||||
uint64_t aggrOffset : 63;
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
} SBlockV0;
|
||||
|
||||
#define SBlock SBlockV0 // latest SBlock definition
|
||||
TSDBKEY minKey;
|
||||
TSDBKEY maxKey;
|
||||
// TSKEY keyFirst;
|
||||
// TSKEY keyLast;
|
||||
} SBlock;
|
||||
|
||||
static FORCE_INLINE bool tsdbIsSupBlock(SBlock *pBlock) { return pBlock->numOfSubBlocks == 1; }
|
||||
static FORCE_INLINE bool tsdbIsSubBlock(SBlock *pBlock) { return pBlock->numOfSubBlocks == 0; }
|
||||
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
int32_t delimiter; // For recovery usage
|
||||
int32_t tid;
|
||||
|
@ -879,11 +866,6 @@ struct TSDBROW {
|
|||
STSRow2 tsRow;
|
||||
};
|
||||
|
||||
struct TSDBKEY {
|
||||
int64_t version;
|
||||
TSKEY ts;
|
||||
};
|
||||
|
||||
struct TABLEID {
|
||||
tb_uid_t suid;
|
||||
tb_uid_t uid;
|
||||
|
|
|
@ -774,7 +774,7 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA,
|
|||
|
||||
pIdx->uid = TABLE_UID(pTable);
|
||||
pIdx->hasLast = pBlock->last ? 1 : 0;
|
||||
pIdx->maxKey = pBlock->keyLast;
|
||||
pIdx->maxKey = pBlock->maxKey;
|
||||
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
|
||||
pIdx->len = tlen;
|
||||
pIdx->offset = (uint32_t)offset;
|
||||
|
@ -895,7 +895,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
|||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
|
||||
if (tsdbCommitMemData(pCommith, pIter, pBlock->minKey.ts - 1, true) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -990,9 +990,9 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
|
|||
TSKEY key = *(TSKEY *)arg1;
|
||||
SBlock *pBlock = (SBlock *)arg2;
|
||||
|
||||
if (key < pBlock->keyFirst) {
|
||||
if (key < pBlock->minKey.ts) {
|
||||
return -1;
|
||||
} else if (key > pBlock->keyLast) {
|
||||
} else if (key > pBlock->maxKey.ts) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -1220,8 +1220,8 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
|
|||
pBlock->numOfSubBlocks = isSuper ? 1 : 0;
|
||||
pBlock->numOfCols = nColsNotAllNull;
|
||||
pBlock->numOfBSma = nColsOfBlockSma;
|
||||
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
||||
pBlock->keyLast = dataColsKeyLast(pDataCols);
|
||||
pBlock->minKey.ts = dataColsKeyFirst(pDataCols);
|
||||
pBlock->maxKey.ts = dataColsKeyLast(pDataCols);
|
||||
pBlock->aggrStat = aggrStatus;
|
||||
pBlock->blkVer = SBlockVerLatest;
|
||||
pBlock->aggrOffset = (uint64_t)offsetAggr;
|
||||
|
@ -1229,7 +1229,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
|
|||
tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
|
||||
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
|
||||
REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
|
||||
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
|
||||
pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1312,7 +1312,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
|||
if (bidx == nBlocks - 1) {
|
||||
keyLimit = pCommith->maxKey;
|
||||
} else {
|
||||
keyLimit = pBlock[1].keyFirst - 1;
|
||||
keyLimit = pBlock[1].minKey.ts - 1;
|
||||
}
|
||||
|
||||
SSkipListIterator titer = *(pIter->pIter);
|
||||
|
@ -1354,8 +1354,8 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
|||
}
|
||||
subBlocks[pBlock->numOfSubBlocks] = block;
|
||||
supBlock = *pBlock;
|
||||
supBlock.keyFirst = mInfo.keyFirst;
|
||||
supBlock.keyLast = mInfo.keyLast;
|
||||
supBlock.minKey.ts = mInfo.keyFirst;
|
||||
supBlock.maxKey.ts = mInfo.keyLast;
|
||||
supBlock.numOfSubBlocks++;
|
||||
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
|
||||
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
typedef struct {
|
||||
SMemTable *pMemTable;
|
||||
SReadH readh;
|
||||
SArray *aBlkIdx;
|
||||
} SCommitH;
|
||||
|
||||
|
|
|
@ -20,10 +20,10 @@
|
|||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
|
||||
|
||||
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
|
||||
((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
|
||||
.numOfCols = (_block)->numOfCols, \
|
||||
.rows = (_block)->numOfRows, \
|
||||
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
|
||||
((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \
|
||||
.numOfCols = (_block)->numOfCols, \
|
||||
.rows = (_block)->numOfRows, \
|
||||
.uid = (_checkInfo)->tableId})
|
||||
|
||||
enum {
|
||||
|
@ -1105,12 +1105,12 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
|
|||
|
||||
if (numOfBlocks == 1) break;
|
||||
|
||||
if (skey > pBlock[midSlot].keyLast) {
|
||||
if (skey > pBlock[midSlot].maxKey.ts) {
|
||||
if (numOfBlocks == 2) break;
|
||||
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
|
||||
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
|
||||
firstSlot = midSlot + 1;
|
||||
} else if (skey < pBlock[midSlot].keyFirst) {
|
||||
if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
|
||||
} else if (skey < pBlock[midSlot].minKey.ts) {
|
||||
if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
|
||||
lastSlot = midSlot - 1;
|
||||
} else {
|
||||
break; // got the slot
|
||||
|
@ -1177,12 +1177,12 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in
|
|||
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
|
||||
int32_t end = start;
|
||||
|
||||
if (s > pCompInfo->blocks[start].keyLast) {
|
||||
if (s > pCompInfo->blocks[start].maxKey.ts) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo speedup the procedure of located end block
|
||||
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
|
||||
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].minKey.ts <= e)) {
|
||||
end += 1;
|
||||
}
|
||||
|
||||
|
@ -1275,7 +1275,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
pBlock->numOfRows = pCols->numOfRows;
|
||||
|
||||
// Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
|
||||
if (pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
if (pBlock->minKey.ts < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
int64_t* src = pCols->cols[0].pData;
|
||||
for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
|
||||
src[i] = tdGetKey(src[i]);
|
||||
|
@ -1287,7 +1287,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
|
||||
tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
|
||||
" us, %s",
|
||||
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime,
|
||||
pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows, elapsedTime,
|
||||
pTsdbReadHandle->idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -1295,7 +1295,8 @@ _error:
|
|||
pBlock->numOfRows = 0;
|
||||
|
||||
tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
|
||||
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
|
||||
pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows,
|
||||
pTsdbReadHandle->idStr);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -1423,7 +1424,7 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
|
|||
|
||||
if (asc) {
|
||||
// query ended in/started from current block
|
||||
if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
|
||||
if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
|
||||
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
|
||||
*exists = false;
|
||||
return code;
|
||||
|
@ -1432,35 +1433,35 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
|
|||
SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
|
||||
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
|
||||
|
||||
if (pCheckInfo->lastKey > pBlock->keyFirst) {
|
||||
if (pCheckInfo->lastKey > pBlock->minKey.ts) {
|
||||
cur->pos =
|
||||
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
|
||||
} else {
|
||||
cur->pos = 0;
|
||||
}
|
||||
|
||||
assert(pCheckInfo->lastKey <= pBlock->keyLast);
|
||||
assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
|
||||
doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
|
||||
} else { // the whole block is loaded in to buffer
|
||||
cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
|
||||
code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
|
||||
}
|
||||
} else { // desc order, query ended in current block
|
||||
if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
|
||||
if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
|
||||
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
|
||||
*exists = false;
|
||||
return code;
|
||||
}
|
||||
|
||||
SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
|
||||
if (pCheckInfo->lastKey < pBlock->keyLast) {
|
||||
if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
|
||||
cur->pos =
|
||||
binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
|
||||
} else {
|
||||
cur->pos = pBlock->numOfRows - 1;
|
||||
}
|
||||
|
||||
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
|
||||
assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
|
||||
doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
|
||||
} else {
|
||||
cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
|
||||
|
@ -1981,8 +1982,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
|
||||
// Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData interface.
|
||||
TSKEY* tsArray = pCols->cols[0].pData;
|
||||
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
|
||||
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
|
||||
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->minKey.ts &&
|
||||
tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);
|
||||
|
||||
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
||||
int32_t step = ascScan ? 1 : -1;
|
||||
|
@ -3576,8 +3577,8 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat
|
|||
assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
|
||||
pPrimaryColStatis->numOfNull = 0;
|
||||
pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
|
||||
pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
|
||||
pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
|
||||
pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
|
||||
pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
|
||||
|
||||
// update the number of NULL data rows
|
||||
|
|
|
@ -339,8 +339,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
|||
}
|
||||
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
|
||||
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
|
||||
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);
|
||||
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->minKey.ts);
|
||||
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->maxKey.ts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -457,8 +457,8 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
}
|
||||
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
|
||||
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
|
||||
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);
|
||||
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->minKey.ts);
|
||||
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->maxKey.ts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -559,7 +559,7 @@ int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
|
|||
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
|
||||
tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
|
||||
tlen += taosEncodeFixedU64(buf, pIdx->uid);
|
||||
tlen += taosEncodeFixedU64(buf, pIdx->maxKey);
|
||||
tlen += taosEncodeFixedU64(buf, pIdx->maxKey.ts);
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
@ -579,7 +579,7 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
|
|||
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
|
||||
pIdx->uid = (int64_t)value;
|
||||
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
|
||||
pIdx->maxKey = (TSKEY)value;
|
||||
pIdx->maxKey.ts = (TSKEY)value;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue