remove tid from tsdb
This commit is contained in:
parent
f136211bad
commit
eafc0f88bc
|
@ -17,12 +17,12 @@
|
||||||
#define _TD_TSDB_READ_IMPL_H_
|
#define _TD_TSDB_READ_IMPL_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tcommon.h"
|
||||||
#include "tfs.h"
|
#include "tfs.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbFile.h"
|
#include "tsdbFile.h"
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "tsdbMemory.h"
|
#include "tsdbMemory.h"
|
||||||
#include "tcommon.h"
|
#include "tskiplist.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -31,7 +31,6 @@ extern "C" {
|
||||||
typedef struct SReadH SReadH;
|
typedef struct SReadH SReadH;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t tid;
|
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
uint32_t hasLast : 2;
|
uint32_t hasLast : 2;
|
||||||
|
@ -81,7 +80,7 @@ typedef struct {
|
||||||
TSKEY keyLast;
|
TSKEY keyLast;
|
||||||
} SBlockV0;
|
} SBlockV0;
|
||||||
|
|
||||||
#define SBlock SBlockV0 // latest SBlock definition
|
#define SBlock SBlockV0 // latest SBlock definition
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -165,19 +164,19 @@ typedef struct {
|
||||||
typedef void SAggrBlkData; // SBlockCol cols[];
|
typedef void SAggrBlkData; // SBlockCol cols[];
|
||||||
|
|
||||||
struct SReadH {
|
struct SReadH {
|
||||||
STsdb * pRepo;
|
STsdb *pRepo;
|
||||||
SDFileSet rSet; // FSET to read
|
SDFileSet rSet; // FSET to read
|
||||||
SArray * aBlkIdx; // SBlockIdx array
|
SArray *aBlkIdx; // SBlockIdx array
|
||||||
STable * pTable; // table to read
|
STable *pTable; // table to read
|
||||||
SBlockIdx * pBlkIdx; // current reading table SBlockIdx
|
SBlockIdx *pBlkIdx; // current reading table SBlockIdx
|
||||||
int cidx;
|
int cidx;
|
||||||
SBlockInfo * pBlkInfo;
|
SBlockInfo *pBlkInfo;
|
||||||
SBlockData * pBlkData; // Block info
|
SBlockData *pBlkData; // Block info
|
||||||
SAggrBlkData *pAggrBlkData; // Aggregate Block info
|
SAggrBlkData *pAggrBlkData; // Aggregate Block info
|
||||||
SDataCols * pDCols[2];
|
SDataCols *pDCols[2];
|
||||||
void * pBuf; // buffer
|
void *pBuf; // buffer
|
||||||
void * pCBuf; // compression buffer
|
void *pCBuf; // compression buffer
|
||||||
void * pExBuf; // extra buffer
|
void *pExBuf; // extra buffer
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
||||||
|
@ -222,14 +221,15 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
|
||||||
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
||||||
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
||||||
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
|
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
|
||||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds);
|
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
|
||||||
|
int numOfColsIds);
|
||||||
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
||||||
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
||||||
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
||||||
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
|
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
||||||
void * pBuf = *ppBuf;
|
void *pBuf = *ppBuf;
|
||||||
size_t tsize = taosTSizeof(pBuf);
|
size_t tsize = taosTSizeof(pBuf);
|
||||||
|
|
||||||
if (tsize < size) {
|
if (tsize < size) {
|
||||||
|
|
|
@ -701,7 +701,6 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
|
||||||
// Set pIdx
|
// Set pIdx
|
||||||
pBlock = taosArrayGetLast(pSupA);
|
pBlock = taosArrayGetLast(pSupA);
|
||||||
|
|
||||||
pIdx->tid = TABLE_TID(pTable);
|
|
||||||
pIdx->uid = TABLE_UID(pTable);
|
pIdx->uid = TABLE_UID(pTable);
|
||||||
pIdx->hasLast = pBlock->last ? 1 : 0;
|
pIdx->hasLast = pBlock->last ? 1 : 0;
|
||||||
pIdx->maxKey = pBlock->keyLast;
|
pIdx->maxKey = pBlock->keyLast;
|
||||||
|
|
|
@ -98,7 +98,7 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
|
||||||
void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); }
|
void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); }
|
||||||
|
|
||||||
int tsdbLoadBlockIdx(SReadH *pReadh) {
|
int tsdbLoadBlockIdx(SReadH *pReadh) {
|
||||||
SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh);
|
SDFile *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
|
||||||
SBlockIdx blkIdx;
|
SBlockIdx blkIdx;
|
||||||
|
|
||||||
ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0);
|
ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0);
|
||||||
|
@ -149,8 +149,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsize++;
|
tsize++;
|
||||||
ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
|
// ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
|
||||||
((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
|
// ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -180,7 +180,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
|
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
|
||||||
if (pBlkIdx->tid == TABLE_TID(pTable)) {
|
if (pBlkIdx->uid == TABLE_TID(pTable)) {
|
||||||
if (pBlkIdx->uid == TABLE_UID(pTable)) {
|
if (pBlkIdx->uid == TABLE_UID(pTable)) {
|
||||||
pReadh->pBlkIdx = pBlkIdx;
|
pReadh->pBlkIdx = pBlkIdx;
|
||||||
} else {
|
} else {
|
||||||
|
@ -188,7 +188,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
|
||||||
}
|
}
|
||||||
pReadh->cidx++;
|
pReadh->cidx++;
|
||||||
break;
|
break;
|
||||||
} else if (pBlkIdx->tid > TABLE_TID(pTable)) {
|
} else if (pBlkIdx->uid > TABLE_TID(pTable)) {
|
||||||
pReadh->pBlkIdx = NULL;
|
pReadh->pBlkIdx = NULL;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -205,7 +205,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
|
||||||
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
||||||
ASSERT(pReadh->pBlkIdx != NULL);
|
ASSERT(pReadh->pBlkIdx != NULL);
|
||||||
|
|
||||||
SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh);
|
SDFile *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
|
||||||
SBlockIdx *pBlkIdx = pReadh->pBlkIdx;
|
SBlockIdx *pBlkIdx = pReadh->pBlkIdx;
|
||||||
|
|
||||||
if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
|
if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
|
||||||
|
@ -237,7 +237,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
|
// ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
|
||||||
|
|
||||||
if (pTarget) {
|
if (pTarget) {
|
||||||
memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
|
memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
|
||||||
|
@ -275,7 +275,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds) {
|
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
|
||||||
|
int numOfColsIds) {
|
||||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||||
int8_t update = pReadh->pRepo->config.update;
|
int8_t update = pReadh->pRepo->config.update;
|
||||||
|
|
||||||
|
@ -388,7 +389,7 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
|
||||||
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
|
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
|
|
||||||
tlen += taosEncodeVariantI32(buf, pIdx->tid);
|
// tlen += taosEncodeVariantI32(buf, pIdx->tid);
|
||||||
tlen += taosEncodeVariantU32(buf, pIdx->len);
|
tlen += taosEncodeVariantU32(buf, pIdx->len);
|
||||||
tlen += taosEncodeVariantU32(buf, pIdx->offset);
|
tlen += taosEncodeVariantU32(buf, pIdx->offset);
|
||||||
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
|
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
|
||||||
|
@ -404,7 +405,7 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
|
||||||
uint32_t numOfBlocks = 0;
|
uint32_t numOfBlocks = 0;
|
||||||
uint64_t value = 0;
|
uint64_t value = 0;
|
||||||
|
|
||||||
if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
|
// if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
|
||||||
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
|
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
|
||||||
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
|
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
|
||||||
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
|
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
|
||||||
|
@ -538,9 +539,9 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
pDataCols->numOfRows = pBlock->numOfRows;
|
pDataCols->numOfRows = pBlock->numOfRows;
|
||||||
|
|
||||||
// Recover the data
|
// Recover the data
|
||||||
int ccol = 0; // loop iter for SBlockCol object
|
int ccol = 0; // loop iter for SBlockCol object
|
||||||
int dcol = 0; // loop iter for SDataCols object
|
int dcol = 0; // loop iter for SDataCols object
|
||||||
int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
|
int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
|
||||||
SBlockCol *pBlockCol = NULL;
|
SBlockCol *pBlockCol = NULL;
|
||||||
while (dcol < pDataCols->numOfCols) {
|
while (dcol < pDataCols->numOfCols) {
|
||||||
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
||||||
|
@ -686,7 +687,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
|
||||||
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||||
|
|
||||||
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
|
||||||
SBlockCol blockCol = {0};
|
SBlockCol blockCol = {0};
|
||||||
|
|
||||||
tdResetDataCols(pDataCols);
|
tdResetDataCols(pDataCols);
|
||||||
|
@ -700,7 +701,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
int ccol = 0;
|
int ccol = 0;
|
||||||
for (int i = 0; i < numOfColIds; i++) {
|
for (int i = 0; i < numOfColIds; i++) {
|
||||||
int16_t colId = colIds[i];
|
int16_t colId = colIds[i];
|
||||||
SDataCol * pDataCol = NULL;
|
SDataCol *pDataCol = NULL;
|
||||||
SBlockCol *pBlockCol = NULL;
|
SBlockCol *pBlockCol = NULL;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
Loading…
Reference in New Issue