Merge branches 'feature/TD-1925' and 'feature/TD-1925' of https://github.com/taosdata/TDengine into feature/TD-1925

This commit is contained in:
Shengliang Guan 2020-12-01 14:59:50 +08:00
commit 4c9f648817
6 changed files with 186 additions and 186 deletions

View File

@ -257,7 +257,7 @@ typedef struct {
uint32_t numOfBlocks : 30; uint32_t numOfBlocks : 30;
uint64_t uid; uint64_t uid;
TSKEY maxKey; TSKEY maxKey;
} SCompIdx; } SBlockIdx;
typedef struct { typedef struct {
int64_t last : 1; int64_t last : 1;
@ -265,19 +265,19 @@ typedef struct {
int32_t algorithm : 8; int32_t algorithm : 8;
int32_t numOfRows : 24; int32_t numOfRows : 24;
int32_t len; int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t numOfSubBlocks; int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column int16_t numOfCols; // not including timestamp column
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
} SCompBlock; } SBlock;
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t tid; int32_t tid;
uint64_t uid; uint64_t uid;
SCompBlock blocks[]; SBlock blocks[];
} SCompInfo; } SBlockInfo;
typedef struct { typedef struct {
int16_t colId; int16_t colId;
@ -291,14 +291,14 @@ typedef struct {
int16_t minIndex; int16_t minIndex;
int16_t numOfNull; int16_t numOfNull;
char padding[2]; char padding[2];
} SCompCol; } SBlockCol;
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage uint64_t uid; // For recovery usage
SCompCol cols[]; SBlockCol cols[];
} SCompData; } SBlockData;
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
@ -316,7 +316,7 @@ typedef struct {
} SHelperTable; } SHelperTable;
typedef struct { typedef struct {
SCompIdx* pIdxArray; SBlockIdx* pIdxArray;
int numOfIdx; int numOfIdx;
int curIdx; int curIdx;
} SIdxH; } SIdxH;
@ -329,14 +329,14 @@ typedef struct {
// For file set usage // For file set usage
SHelperFile files; SHelperFile files;
SIdxH idxH; SIdxH idxH;
SCompIdx curCompIdx; SBlockIdx curCompIdx;
void* pWIdx; void* pWIdx;
// For table set usage // For table set usage
SHelperTable tableInfo; SHelperTable tableInfo;
SCompInfo* pCompInfo; SBlockInfo* pCompInfo;
bool hasOldLastBlock; bool hasOldLastBlock;
// For block set usage // For block set usage
SCompData* pCompData; SBlockData* pCompData;
SDataCols* pDataCols[2]; SDataCols* pDataCols[2];
void* pBuffer; // Buffer to hold the whole data block void* pBuffer; // Buffer to hold the whole data block
void* compBuffer; // Buffer for temperary compress/decompress purpose void* compBuffer; // Buffer for temperary compress/decompress purpose
@ -355,8 +355,8 @@ typedef struct {
typedef struct { typedef struct {
SFileGroup fGroup; SFileGroup fGroup;
int numOfIdx; int numOfIdx;
SCompIdx* pCompIdx; SBlockIdx* pCompIdx;
SCompInfo* pCompInfo; SBlockInfo* pCompInfo;
void* pBuf; void* pBuf;
FILE* tLogStream; FILE* tLogStream;
} STsdbScanHandle; } STsdbScanHandle;
@ -535,10 +535,10 @@ int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
// ------------------ tsdbRWHelper.c // ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set #define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded #define TSDB_HELPER_IDX_LOAD 0x2 // SBlockIdx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set #define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded #define TSDB_HELPER_INFO_LOAD 0x8 // SBlockInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded #define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SBlockData part is loaded
#define helperSetState(h, s) (((h)->state) |= (s)) #define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s))) #define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) #define helperHasState(h, s) ((((h)->state) & (s)) == (s))
@ -568,15 +568,15 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int tsdbWriteCompInfo(SRWHelper* pHelper); int tsdbWriteCompInfo(SRWHelper* pHelper);
int tsdbWriteCompIdx(SRWHelper* pHelper); int tsdbWriteCompIdx(SRWHelper* pHelper);
int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer);
int tsdbDecodeSCompIdxImpl(void* buffer, uint32_t len, SCompIdx** ppCompIdx, int* numOfIdx); int tsdbDecodeSBlockIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx);
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfoImpl(SFile* pFile, SCompIdx* pIdx, SCompInfo** ppCompInfo); int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds, int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds,
int numOfColIds); int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo); int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo);
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
if (*(TSKEY*)key1 > *(TSKEY*)key2) { if (*(TSKEY*)key1 > *(TSKEY*)key2) {
@ -608,8 +608,8 @@ int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int
STsdbScanHandle* tsdbNewScanHandle(); STsdbScanHandle* tsdbNewScanHandle();
void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream); void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream);
int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid); int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid);
int tsdbScanSCompIdx(STsdbScanHandle* pScanHandle); int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle);
int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx); int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx);
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); int tsdbCloseScanFile(STsdbScanHandle* pScanHandle);
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);

View File

@ -207,7 +207,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
newLast = TSDB_NLAST_FILE_OPENED(pHelper); newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbLoadCompIdx(pHelper, NULL) < 0) { if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to load SBlockIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; goto _err;
} }
@ -243,7 +243,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
goto _err; goto _err;
} }
// Write the SCompBlock part // Write the SBlock part
if (tsdbWriteCompInfo(pHelper) < 0) { if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; goto _err;

View File

@ -715,7 +715,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL) continue;
if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err;
SCompIdx *pIdx = &(rhelper.curCompIdx); SBlockIdx *pIdx = &(rhelper.curCompIdx);
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
} }

View File

@ -22,19 +22,19 @@
#include "tscompression.h" #include "tscompression.h"
#include "tsdbMain.h" #include "tsdbMain.h"
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) #define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM))
#define TSDB_KEY_COL_OFFSET 0 #define TSDB_KEY_COL_OFFSET 0
#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock)) #define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SBlock))
#define TSDB_IS_LAST_BLOCK(pb) ((pb)->last) #define TSDB_IS_LAST_BLOCK(pb) ((pb)->last)
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock,
bool isLast, bool isSuperBlock); bool isLast, bool isSuperBlock);
static int compareKeyBlock(const void *arg1, const void *arg2); static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx);
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo); static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx);
static void tsdbResetHelperFileImpl(SRWHelper *pHelper); static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
static int tsdbInitHelperFile(SRWHelper *pHelper); static int tsdbInitHelperFile(SRWHelper *pHelper);
static void tsdbDestroyHelperFile(SRWHelper *pHelper); static void tsdbDestroyHelperFile(SRWHelper *pHelper);
@ -48,21 +48,21 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize); int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds); int numOfColIds);
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols);
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); static int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); static void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey); static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey);
static void tsdbDestroyHelperBlock(SRWHelper *pHelper); static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol,
SDataCol *pDataCol); SDataCol *pDataCol);
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock); static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock);
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx); int *blkIdx);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update); TSKEY maxKey, int maxRows, int8_t update);
static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SCompBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps); static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps);
static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx); static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx);
// ---------------------- INTERNAL FUNCTIONS ---------------------- // ---------------------- INTERNAL FUNCTIONS ----------------------
@ -242,28 +242,28 @@ int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
if (pHelper->idxH.numOfIdx > 0) { if (pHelper->idxH.numOfIdx > 0) {
while (true) { while (true) {
if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) { if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx));
break; break;
} }
SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); SBlockIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]);
if (pIdx->tid == TABLE_TID(pTable)) { if (pIdx->tid == TABLE_TID(pTable)) {
if (pIdx->uid == TABLE_UID(pTable)) { if (pIdx->uid == TABLE_UID(pTable)) {
pHelper->curCompIdx = *pIdx; pHelper->curCompIdx = *pIdx;
} else { } else {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx));
} }
pHelper->idxH.curIdx++; pHelper->idxH.curIdx++;
break; break;
} else if (pIdx->tid > TABLE_TID(pTable)) { } else if (pIdx->tid > TABLE_TID(pTable)) {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx));
break; break;
} else { } else {
pHelper->idxH.curIdx++; pHelper->idxH.curIdx++;
} }
} }
} else { } else {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx)); memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx));
} }
if (helperType(pHelper) == TSDB_WRITE_HELPER && pHelper->curCompIdx.hasLast) { if (helperType(pHelper) == TSDB_WRITE_HELPER && pHelper->curCompIdx.hasLast) {
@ -279,7 +279,7 @@ int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx *pIdx = &(pHelper->curCompIdx); SBlockIdx *pIdx = &(pHelper->curCompIdx);
int blkIdx = 0; int blkIdx = 0;
ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable));
@ -305,12 +305,12 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
STsdbCfg *pCfg = &pHelper->pRepo->config; STsdbCfg *pCfg = &pHelper->pRepo->config;
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx * pIdx = &(pHelper->curCompIdx); SBlockIdx * pIdx = &(pHelper->curCompIdx);
SCompBlock compBlock = {0}; SBlock compBlock = {0};
if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last); ASSERT(pCompBlock->last);
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
@ -360,7 +360,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
} }
int tsdbWriteCompInfo(SRWHelper *pHelper) { int tsdbWriteCompInfo(SRWHelper *pHelper) {
SCompIdx *pIdx = &(pHelper->curCompIdx); SBlockIdx *pIdx = &(pHelper->curCompIdx);
off_t offset = 0; off_t offset = 0;
SFile * pFile = helperNewHeadF(pHelper); SFile * pFile = helperNewHeadF(pHelper);
@ -371,8 +371,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pHelper->pCompInfo->uid = pHelper->tableInfo.uid; pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
pHelper->pCompInfo->tid = pHelper->tableInfo.tid; pHelper->pCompInfo->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) && ASSERT(pIdx->len > sizeof(SBlockInfo) + sizeof(TSCKSUM) &&
(pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); (pIdx->len - sizeof(SBlockInfo) - sizeof(TSCKSUM)) % sizeof(SBlock) == 0);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
} }
@ -396,7 +396,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
return -1; return -1;
} }
if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) { if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SBlockIdx) + 12) {
pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2); pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2);
if (pHelper->pWIdx == NULL) { if (pHelper->pWIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@ -405,7 +405,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
} }
void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len);
pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); pFile->info.len += tsdbEncodeSBlockIdx(&pBuf, &(pHelper->curCompIdx));
pFile->info.size += pIdx->len; pFile->info.size += pIdx->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
@ -456,7 +456,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
} }
int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) { int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) {
const char *prefixMsg = "failed to load SCompIdx part"; const char *prefixMsg = "failed to load SBlockIdx part";
if (lseek(pFile->fd, offset, SEEK_SET) < 0) { if (lseek(pFile->fd, offset, SEEK_SET) < 0) {
tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), offset, strerror(errno)); tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -479,23 +479,23 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe
return 0; return 0;
} }
int tsdbDecodeSCompIdxImpl(void *buffer, uint32_t len, SCompIdx **ppCompIdx, int *numOfIdx) { int tsdbDecodeSBlockIdxImpl(void *buffer, uint32_t len, SBlockIdx **ppCompIdx, int *numOfIdx) {
int nIdx = 0; int nIdx = 0;
void *pPtr = buffer; void *pPtr = buffer;
while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) { while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) {
size_t tlen = taosTSizeof(*ppCompIdx); size_t tlen = taosTSizeof(*ppCompIdx);
if (tlen < sizeof(SCompIdx) * (nIdx + 1)) { if (tlen < sizeof(SBlockIdx) * (nIdx + 1)) {
*ppCompIdx = (SCompIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2); *ppCompIdx = (SBlockIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2);
if (*ppCompIdx == NULL) { if (*ppCompIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
} }
pPtr = tsdbDecodeSCompIdx(pPtr, &((*ppCompIdx)[nIdx])); pPtr = tsdbDecodeSBlockIdx(pPtr, &((*ppCompIdx)[nIdx]));
if (pPtr == NULL) { if (pPtr == NULL) {
tsdbError("failed to decode SCompIdx part, idx:%d", nIdx); tsdbError("failed to decode SBlockIdx part, idx:%d", nIdx);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1; return -1;
} }
@ -522,15 +522,15 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
return -1; return -1;
} }
// Load SCompIdx binary from file // Load SBlockIdx binary from file
if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) { if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) {
return -1; return -1;
} }
// Decode the SCompIdx part // Decode the SBlockIdx part
if (tsdbDecodeSCompIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray), if (tsdbDecodeSBlockIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray),
&(pHelper->idxH.numOfIdx)) < 0) { &(pHelper->idxH.numOfIdx)) < 0) {
tsdbError("vgId:%d failed to decode SCompIdx part from file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), tsdbError("vgId:%d failed to decode SBlockIdx part from file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile),
tstrerror(errno)); tstrerror(errno));
return -1; return -1;
} }
@ -540,13 +540,13 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
// Copy the memory for outside usage // Copy the memory for outside usage
if (target && pHelper->idxH.numOfIdx > 0) if (target && pHelper->idxH.numOfIdx > 0)
memcpy(target, pHelper->idxH.pIdxArray, sizeof(SCompIdx) * pHelper->idxH.numOfIdx); memcpy(target, pHelper->idxH.pIdxArray, sizeof(SBlockIdx) * pHelper->idxH.numOfIdx);
return 0; return 0;
} }
int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) { int tsdbLoadCompInfoImpl(SFile *pFile, SBlockIdx *pIdx, SBlockInfo **ppCompInfo) {
const char *prefixMsg = "failed to load SCompInfo/SCompBlock part"; const char *prefixMsg = "failed to load SBlockInfo/SBlock part";
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) { if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) {
tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), pIdx->offset, strerror(errno)); tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), pIdx->offset, strerror(errno));
@ -579,7 +579,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
SCompIdx *pIdx = &(pHelper->curCompIdx); SBlockIdx *pIdx = &(pHelper->curCompIdx);
SFile *pFile = helperHeadF(pHelper); SFile *pFile = helperHeadF(pHelper);
@ -600,7 +600,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
return 0; return 0;
} }
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { int tsdbLoadCompData(SRWHelper *pHelper, SBlock *pCompBlock, void *target) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper);
@ -639,7 +639,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
} }
void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) { void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) {
SCompData *pCompData = pHelper->pCompData; SBlockData *pCompData = pHelper->pCompData;
for (int i = 0, j = 0; i < numOfCols;) { for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pCompData->numOfCols) { if (j >= pCompData->numOfCols) {
@ -666,13 +666,13 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols)
} }
} }
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo, int16_t *colIds, int numOfColIds) { int tsdbLoadBlockDataCols(SRWHelper *pHelper, SBlock *pCompBlock, SBlockInfo *pCompInfo, int16_t *colIds, int numOfColIds) {
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
SCompBlock *pTCompBlock = pCompBlock; SBlock *pTCompBlock = pCompBlock;
int numOfSubBlocks = pCompBlock->numOfSubBlocks; int numOfSubBlocks = pCompBlock->numOfSubBlocks;
if (numOfSubBlocks > 1) if (numOfSubBlocks > 1)
pTCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); pTCompBlock = (SBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset);
tdResetDataCols(pHelper->pDataCols[0]); tdResetDataCols(pHelper->pDataCols[0]);
if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err; if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err;
@ -693,12 +693,12 @@ _err:
return -1; return -1;
} }
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo) { int tsdbLoadBlockData(SRWHelper *pHelper, SBlock *pCompBlock, SBlockInfo *pCompInfo) {
SCompBlock *pTCompBlock = pCompBlock; SBlock *pTCompBlock = pCompBlock;
int numOfSubBlock = pCompBlock->numOfSubBlocks; int numOfSubBlock = pCompBlock->numOfSubBlocks;
if (numOfSubBlock > 1) if (numOfSubBlock > 1)
pTCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); pTCompBlock = (SBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset);
tdResetDataCols(pHelper->pDataCols[0]); tdResetDataCols(pHelper->pDataCols[0]);
if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[0]) < 0) goto _err; if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
@ -728,10 +728,10 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
return false; return false;
} }
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SCompBlock *pCompBlock, static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock,
bool isLast, bool isSuperBlock) { bool isLast, bool isSuperBlock) {
STsdbCfg * pCfg = &(pHelper->pRepo->config); STsdbCfg * pCfg = &(pHelper->pRepo->config);
SCompData *pCompData = (SCompData *)(pHelper->pBuffer); SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer);
int64_t offset = 0; int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows; int rowsToWrite = pDataCols->numOfRows;
@ -749,7 +749,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol; SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
continue; continue;
@ -779,7 +779,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
if (ncol != 0 && tcol >= nColsNotAllNull) break; if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol *pDataCol = pDataCols->cols + ncol; SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + tcol; SBlockCol *pCompCol = pCompData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue;
void *tptr = POINTER_SHIFT(pCompData, lsize); void *tptr = POINTER_SHIFT(pCompData, lsize);
@ -868,7 +868,7 @@ _err:
static int compareKeyBlock(const void *arg1, const void *arg2) { static int compareKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1; TSKEY key = *(TSKEY *)arg1;
SCompBlock *pBlock = (SCompBlock *)arg2; SBlock *pBlock = (SBlock *)arg2;
if (key < pBlock->keyFirst) { if (key < pBlock->keyFirst) {
return -1; return -1;
@ -881,42 +881,42 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
if (taosTSizeof((void *)pHelper->pCompInfo) <= esize) { if (taosTSizeof((void *)pHelper->pCompInfo) <= esize) {
size_t tsize = esize + sizeof(SCompBlock) * 16; size_t tsize = esize + sizeof(SBlock) * 16;
pHelper->pCompInfo = (SCompInfo *)taosTRealloc(pHelper->pCompInfo, tsize); pHelper->pCompInfo = (SBlockInfo *)taosTRealloc(pHelper->pCompInfo, tsize);
if (pHelper->pCompInfo == NULL) return -1; if (pHelper->pCompInfo == NULL) return -1;
} }
return 0; return 0;
} }
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) {
SCompIdx *pIdx = &(pHelper->curCompIdx); SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks); ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1); ASSERT(pCompBlock->numOfSubBlocks == 1);
// Adjust memory if no more room // Adjust memory if no more room
if (pIdx->len == 0) pIdx->len = sizeof(SCompInfo) + sizeof(TSCKSUM); if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err; if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err;
// Change the offset // Change the offset
for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) { for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock);
} }
// Memmove if needed // Memmove if needed
int tsize = pIdx->len - (sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx); int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx);
if (tsize > 0) { if (tsize > 0) {
ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo)); ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo));
ASSERT(sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo)); ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo));
memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * (blkIdx + 1)), memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)),
POINTER_SHIFT(pHelper->pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * blkIdx), tsize); POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize);
} }
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
pIdx->numOfBlocks++; pIdx->numOfBlocks++;
pIdx->len += sizeof(SCompBlock); pIdx->len += sizeof(SBlock);
ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo)); ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo));
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
@ -936,47 +936,47 @@ _err:
return -1; return -1;
} }
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo) { static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo) {
ASSERT(pCompBlock->numOfSubBlocks == 0); ASSERT(pCompBlock->numOfSubBlocks == 0);
SCompIdx *pIdx = &(pHelper->curCompIdx); SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks);
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; SBlock *pSBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); ASSERT(pSBlock->numOfSubBlocks >= 1 && pSBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
size_t spaceNeeded = size_t spaceNeeded =
(pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SCompBlock) * 2 : pIdx->len + sizeof(SCompBlock); (pSBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; pSBlock = pHelper->pCompInfo->blocks + blkIdx;
// Add the sub-block // Add the sub-block
if (pSCompBlock->numOfSubBlocks > 1) { if (pSBlock->numOfSubBlocks > 1) {
size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); size_t tsize = (size_t)(pIdx->len - (pSBlock->offset + pSBlock->len));
if (tsize > 0) { if (tsize > 0) {
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)), memmove((void *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len + sizeof(SBlock)),
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); (void *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len), tsize);
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock);
} }
} }
*(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; *(SBlock *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len) = *pCompBlock;
pSCompBlock->numOfSubBlocks++; pSBlock->numOfSubBlocks++;
ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); ASSERT(pSBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
pSCompBlock->len += sizeof(SCompBlock); pSBlock->len += sizeof(SBlock);
pSCompBlock->numOfRows = pSCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; pSBlock->numOfRows = pSBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed;
pSCompBlock->keyFirst = pMergeInfo->keyFirst; pSBlock->keyFirst = pMergeInfo->keyFirst;
pSCompBlock->keyLast = pMergeInfo->keyLast; pSBlock->keyLast = pMergeInfo->keyLast;
pIdx->len += sizeof(SCompBlock); pIdx->len += sizeof(SBlock);
} else { // Need to create two sub-blocks } else { // Need to create two sub-blocks
void *ptr = NULL; void *ptr = NULL;
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) { if (pTCompBlock->numOfSubBlocks > 1) {
ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset); ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
break; break;
@ -987,26 +987,26 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
if (tsize > 0) { if (tsize > 0) {
memmove(POINTER_SHIFT(ptr, sizeof(SCompBlock) * 2), ptr, tsize); memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize);
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2);
} }
} }
((SCompBlock *)ptr)[0] = *pSCompBlock; ((SBlock *)ptr)[0] = *pSBlock;
((SCompBlock *)ptr)[0].numOfSubBlocks = 0; ((SBlock *)ptr)[0].numOfSubBlocks = 0;
((SCompBlock *)ptr)[1] = *pCompBlock; ((SBlock *)ptr)[1] = *pCompBlock;
pSCompBlock->numOfSubBlocks = 2; pSBlock->numOfSubBlocks = 2;
pSCompBlock->numOfRows = pSCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; pSBlock->numOfRows = pSBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed;
pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); pSBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
pSCompBlock->len = sizeof(SCompBlock) * 2; pSBlock->len = sizeof(SBlock) * 2;
pSCompBlock->keyFirst = pMergeInfo->keyFirst; pSBlock->keyFirst = pMergeInfo->keyFirst;
pSCompBlock->keyLast = pMergeInfo->keyLast; pSBlock->keyLast = pMergeInfo->keyLast;
pIdx->len += (sizeof(SCompBlock) * 2); pIdx->len += (sizeof(SBlock) * 2);
} }
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
@ -1020,34 +1020,34 @@ _err:
return -1; return -1;
} }
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) {
ASSERT(pCompBlock->numOfSubBlocks == 1); ASSERT(pCompBlock->numOfSubBlocks == 1);
SCompIdx *pIdx = &(pHelper->curCompIdx); SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks);
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; SBlock *pSBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1); ASSERT(pSBlock->numOfSubBlocks >= 1);
// Delete the sub blocks it has // Delete the sub blocks it has
if (pSCompBlock->numOfSubBlocks > 1) { if (pSBlock->numOfSubBlocks > 1) {
size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); size_t tsize = (size_t)(pIdx->len - (pSBlock->offset + pSBlock->len));
if (tsize > 0) { if (tsize > 0) {
memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset), memmove(POINTER_SHIFT(pHelper->pCompInfo, pSBlock->offset),
POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize); POINTER_SHIFT(pHelper->pCompInfo, pSBlock->offset + pSBlock->len), tsize);
} }
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSBlock->numOfSubBlocks);
} }
pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); pIdx->len -= (sizeof(SBlock) * pSBlock->numOfSubBlocks);
} }
*pSCompBlock = *pCompBlock; *pSBlock = *pCompBlock;
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
@ -1061,12 +1061,12 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
} }
static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) { static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) {
SCompIdx *pCompIdx = &(pHelper->curCompIdx); SBlockIdx *pCompIdx = &(pHelper->curCompIdx);
ASSERT(pCompIdx->numOfBlocks > 0 && blkIdx < pCompIdx->numOfBlocks); ASSERT(pCompIdx->numOfBlocks > 0 && blkIdx < pCompIdx->numOfBlocks);
SCompBlock *pCompBlock= blockAtIdx(pHelper, blkIdx); SBlock *pCompBlock= blockAtIdx(pHelper, blkIdx);
SCompBlock compBlock = *pCompBlock; SBlock compBlock = *pCompBlock;
ASSERT(pCompBlock->numOfSubBlocks > 0 && pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); ASSERT(pCompBlock->numOfSubBlocks > 0 && pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
if (pCompIdx->numOfBlocks == 1) { if (pCompIdx->numOfBlocks == 1) {
@ -1075,21 +1075,21 @@ static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) {
int tsize = 0; int tsize = 0;
if (compBlock.numOfSubBlocks > 1) { if (compBlock.numOfSubBlocks > 1) {
tsize = (int)(pCompIdx->len - (compBlock.offset + sizeof(SCompBlock) * compBlock.numOfSubBlocks)); tsize = (int)(pCompIdx->len - (compBlock.offset + sizeof(SBlock) * compBlock.numOfSubBlocks));
ASSERT(tsize > 0); ASSERT(tsize > 0);
memmove(POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset), memmove(POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset),
POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset + sizeof(SCompBlock) * compBlock.numOfSubBlocks), POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset + sizeof(SBlock) * compBlock.numOfSubBlocks),
tsize); tsize);
pCompIdx->len = pCompIdx->len - sizeof(SCompBlock) * compBlock.numOfSubBlocks; pCompIdx->len = pCompIdx->len - sizeof(SBlock) * compBlock.numOfSubBlocks;
} }
tsize = (int)(pCompIdx->len - POINTER_DISTANCE(blockAtIdx(pHelper, blkIdx + 1), pHelper->pCompInfo)); tsize = (int)(pCompIdx->len - POINTER_DISTANCE(blockAtIdx(pHelper, blkIdx + 1), pHelper->pCompInfo));
ASSERT(tsize > 0); ASSERT(tsize > 0);
memmove((void *)blockAtIdx(pHelper, blkIdx), (void *)blockAtIdx(pHelper, blkIdx + 1), tsize); memmove((void *)blockAtIdx(pHelper, blkIdx), (void *)blockAtIdx(pHelper, blkIdx + 1), tsize);
pCompIdx->len -= sizeof(SCompBlock); pCompIdx->len -= sizeof(SBlock);
pCompIdx->numOfBlocks--; pCompIdx->numOfBlocks--;
pCompIdx->hasLast = (uint32_t)(blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->last); pCompIdx->hasLast = (uint32_t)(blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->last);
@ -1191,7 +1191,7 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
// TODO: pMeta->maxRowBytes and pMeta->maxCols may change here causing invalid write // TODO: pMeta->maxRowBytes and pMeta->maxCols may change here causing invalid write
pHelper->pBuffer = pHelper->pBuffer =
taosTMalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols + taosTMalloc(sizeof(SBlockData) + (sizeof(SBlockCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols +
pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM)); pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
if (pHelper->pBuffer == NULL) { if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@ -1239,7 +1239,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
return 0; return 0;
} }
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol,
SDataCol *pDataCol) { SDataCol *pDataCol) {
ASSERT(pDataCol->colId == pCompCol->colId); ASSERT(pDataCol->colId == pCompCol->colId);
int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
@ -1280,14 +1280,14 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
return 0; return 0;
} }
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
ASSERT(colIds[0] == 0); ASSERT(colIds[0] == 0);
SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper);
SCompCol compCol = {0}; SBlockCol compCol = {0};
// If only load timestamp column, no need to load SCompData part // If only load timestamp column, no need to load SBlockData part
if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
pDataCols->numOfRows = pCompBlock->numOfRows; pDataCols->numOfRows = pCompBlock->numOfRows;
@ -1297,7 +1297,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
for (int i = 0; i < numOfColIds; i++) { for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i]; int16_t colId = colIds[i];
SDataCol *pDataCol = NULL; SDataCol *pDataCol = NULL;
SCompCol *pCompCol = NULL; SBlockCol *pCompCol = NULL;
while (true) { while (true) {
if (dcol >= pDataCols->numOfCols) { if (dcol >= pDataCols->numOfCols) {
@ -1357,7 +1357,7 @@ _err:
return -1; return -1;
} }
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper);
@ -1368,7 +1368,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
goto _err; goto _err;
} }
SCompData *pCompData = (SCompData *)pHelper->pBuffer; SBlockData *pCompData = (SBlockData *)pHelper->pBuffer;
int fd = pFile->fd; int fd = pFile->fd;
if (lseek(fd, (off_t)pCompBlock->offset, SEEK_SET) < 0) { if (lseek(fd, (off_t)pCompBlock->offset, SEEK_SET) < 0) {
@ -1396,7 +1396,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
pDataCols->numOfRows = pCompBlock->numOfRows; pDataCols->numOfRows = pCompBlock->numOfRows;
// Recover the data // Recover the data
int ccol = 0; // loop iter for SCompCol object int ccol = 0; // loop iter for SBlockCol object
int dcol = 0; // loop iter for SDataCols object int dcol = 0; // loop iter for SDataCols object
while (dcol < pDataCols->numOfCols) { while (dcol < pDataCols->numOfCols) {
SDataCol *pDataCol = &(pDataCols->cols[dcol]); SDataCol *pDataCol = &(pDataCols->cols[dcol]);
@ -1412,7 +1412,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
int32_t tlen = pCompBlock->keyLen; int32_t tlen = pCompBlock->keyLen;
if (dcol != 0) { if (dcol != 0) {
SCompCol *pCompCol = &(pCompData->cols[ccol]); SBlockCol *pCompCol = &(pCompData->cols[ccol]);
tcolId = pCompCol->colId; tcolId = pCompCol->colId;
toffset = pCompCol->offset; toffset = pCompCol->offset;
tlen = pCompCol->len; tlen = pCompCol->len;
@ -1456,7 +1456,7 @@ _err:
return -1; return -1;
} }
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) { static int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeVariantI32(buf, pIdx->tid); tlen += taosEncodeVariantI32(buf, pIdx->tid);
@ -1470,7 +1470,7 @@ static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) {
return tlen; return tlen;
} }
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { static void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
uint8_t hasLast = 0; uint8_t hasLast = 0;
uint32_t numOfBlocks = 0; uint32_t numOfBlocks = 0;
uint64_t value = 0; uint64_t value = 0;
@ -1493,17 +1493,17 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
STsdbCfg * pCfg = &(pHelper->pRepo->config); STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable; STable * pTable = pCommitIter->pTable;
SCompIdx * pIdx = &(pHelper->curCompIdx); SBlockIdx * pIdx = &(pHelper->curCompIdx);
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SCompBlock compBlock = {0}; SBlock compBlock = {0};
SMergeInfo mergeInfo = {0}; SMergeInfo mergeInfo = {0};
SMergeInfo *pMergeInfo = &mergeInfo; SMergeInfo *pMergeInfo = &mergeInfo;
ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey);
if (pIdx->hasLast) { // append to with last block if (pIdx->hasLast) { // append to with last block
ASSERT(pIdx->len > 0); ASSERT(pIdx->len > 0);
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, pDataCols,
NULL, 0, pCfg->update, pMergeInfo); NULL, 0, pCfg->update, pMergeInfo);
@ -1556,21 +1556,21 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int *blkIdx) { int *blkIdx) {
STsdbCfg * pCfg = &(pHelper->pRepo->config); STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable; STable * pTable = pCommitIter->pTable;
SCompIdx * pIdx = &(pHelper->curCompIdx); SBlockIdx * pIdx = &(pHelper->curCompIdx);
SCompBlock compBlock = {0}; SBlock compBlock = {0};
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SDataCols * pDataCols0 = pHelper->pDataCols[0]; SDataCols * pDataCols0 = pHelper->pDataCols[0];
SMergeInfo mergeInfo = {0}; SMergeInfo mergeInfo = {0};
SMergeInfo *pMergeInfo = &mergeInfo; SMergeInfo *pMergeInfo = &mergeInfo;
SCompBlock oBlock = {0}; SBlock oBlock = {0};
SSkipListIterator slIter = {0}; SSkipListIterator slIter = {0};
ASSERT(keyFirst <= pIdx->maxKey); ASSERT(keyFirst <= pIdx->maxKey);
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE);
ASSERT(pCompBlock != NULL); ASSERT(pCompBlock != NULL);
int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock));
oBlock = *pCompBlock; oBlock = *pCompBlock;
@ -1722,7 +1722,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
} }
} }
static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SCompBlock *pCompBlock) { static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) {
STsdbCfg *pCfg = &(pHelper->pRepo->config); STsdbCfg *pCfg = &(pHelper->pRepo->config);
SFile * pFile = NULL; SFile * pFile = NULL;
bool isLast = false; bool isLast = false;
@ -1743,7 +1743,7 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols,
return 0; return 0;
} }
static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SCompBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps) { static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps) {
STsdbCfg *pCfg = &(pHelper->pRepo->config); STsdbCfg *pCfg = &(pHelper->pRepo->config);
int mergeRows = pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; int mergeRows = pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed;

View File

@ -69,7 +69,7 @@ typedef struct STableCheckInfo {
STableId tableId; STableId tableId;
TSKEY lastKey; TSKEY lastKey;
STable* pTableObj; STable* pTableObj;
SCompInfo* pCompInfo; SBlockInfo* pCompInfo;
int32_t compSize; int32_t compSize;
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
int8_t chosen:2; // indicate which iterator should move forward int8_t chosen:2; // indicate which iterator should move forward
@ -79,7 +79,7 @@ typedef struct STableCheckInfo {
} STableCheckInfo; } STableCheckInfo;
typedef struct STableBlockInfo { typedef struct STableBlockInfo {
SCompBlock* compBlock; SBlock* compBlock;
STableCheckInfo* pTableCheckInfo; STableCheckInfo* pTableCheckInfo;
} STableBlockInfo; } STableBlockInfo;
@ -136,7 +136,7 @@ typedef struct STableGroupSupporter {
static STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList); static STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbQueryHandle* pQueryHandle); STsdbQueryHandle* pQueryHandle);
@ -669,7 +669,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
return (int32_t)fid; return (int32_t)fid;
} }
static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
int32_t firstSlot = 0; int32_t firstSlot = 0;
int32_t lastSlot = numOfBlocks - 1; int32_t lastSlot = numOfBlocks - 1;
@ -712,7 +712,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
break; break;
} }
SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; SBlockIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
// no data block in this file, try next file // no data block in this file, try next file
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
@ -729,12 +729,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
break; break;
} }
pCheckInfo->pCompInfo = (SCompInfo*) t; pCheckInfo->pCompInfo = (SBlockInfo*) t;
pCheckInfo->compSize = compIndex->len; pCheckInfo->compSize = compIndex->len;
} }
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
SCompInfo* pCompInfo = pCheckInfo->pCompInfo; SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
@ -763,7 +763,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo->numOfBlocks = (end - start); pCheckInfo->numOfBlocks = (end - start);
if (start > 0) { if (start > 0) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SCompBlock)); memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
} }
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
@ -772,7 +772,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
return code; return code;
} }
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
@ -838,7 +838,7 @@ static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, i
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle); static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
@ -921,7 +921,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
return code; return code;
} }
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1327,7 +1327,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
// only return the qualified data to client in terms of query time window, data rows in the same block but do not // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded // be included in the query time window will be discarded
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
@ -1626,7 +1626,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
continue; continue;
} }
SCompBlock* pBlock = pTableCheck->pCompInfo->blocks; SBlock* pBlock = pTableCheck->pCompInfo->blocks;
sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks; sup.numOfBlocksPerTable[numOfQualTables] = pTableCheck->numOfBlocks;
char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks); char* buf = calloc(1, sizeof(STableBlockInfo) * pTableCheck->numOfBlocks);
@ -2316,7 +2316,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns; return pHandle->pColumns;
} else { // only load the file block } else { // only load the file block
SCompBlock* pBlock = pBlockInfo->compBlock; SBlock* pBlock = pBlockInfo->compBlock;
if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) { if (doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot) != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }

View File

@ -25,9 +25,9 @@ void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream) {}
int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; } int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
int tsdbScanSCompIdx(STsdbScanHandle* pScanHandle) { return 0; } int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle) { return 0; }
int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; } int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; }
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; } int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; }