Merge remote-tracking branch 'origin/enh/tsdb_optimize' into enh/tsdb_optimize
This commit is contained in:
commit
6a29bce44e
|
@ -191,16 +191,16 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_COLUMNS 4096
|
||||
#define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns
|
||||
|
||||
#define TSDB_NODE_NAME_LEN 64
|
||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
|
||||
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
|
||||
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_DB_NAME_LEN 65
|
||||
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_PRIVILEDGE_CONDITION_LEN 200
|
||||
#define TSDB_NODE_NAME_LEN 64
|
||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
|
||||
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
|
||||
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_DB_NAME_LEN 65
|
||||
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_PRIVILEDGE_CONDITION_LEN 200
|
||||
|
||||
#define TSDB_FUNC_NAME_LEN 65
|
||||
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024
|
||||
|
@ -249,15 +249,15 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_LABEL_LEN 8
|
||||
#define TSDB_JOB_STATUS_LEN 32
|
||||
|
||||
#define TSDB_CLUSTER_ID_LEN 40
|
||||
#define TSDB_FQDN_LEN 128
|
||||
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
|
||||
#define TSDB_IPv4ADDR_LEN 16
|
||||
#define TSDB_FILENAME_LEN 128
|
||||
#define TSDB_SHOW_SQL_LEN 2048
|
||||
#define TSDB_CLUSTER_ID_LEN 40
|
||||
#define TSDB_FQDN_LEN 128
|
||||
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
|
||||
#define TSDB_IPv4ADDR_LEN 16
|
||||
#define TSDB_FILENAME_LEN 128
|
||||
#define TSDB_SHOW_SQL_LEN 2048
|
||||
#define TSDB_SHOW_SCHEMA_JSON_LEN TSDB_MAX_COLUMNS * 256
|
||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||
|
||||
#define TSDB_TRANS_STAGE_LEN 12
|
||||
#define TSDB_TRANS_TYPE_LEN 16
|
||||
|
@ -370,7 +370,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
|
||||
#define TSDB_MIN_STT_TRIGGER 1
|
||||
#define TSDB_MAX_STT_TRIGGER 16
|
||||
#define TSDB_DEFAULT_SST_TRIGGER 1
|
||||
#define TSDB_DEFAULT_SST_TRIGGER 2
|
||||
#define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN)
|
||||
#define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2)
|
||||
#define TSDB_DEFAULT_HASH_PREFIX 0
|
||||
|
@ -410,10 +410,10 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024)
|
||||
#define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY_PLAN"
|
||||
|
||||
#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519
|
||||
#define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
|
||||
#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
|
||||
#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
|
||||
#define TSDB_MAX_FIELD_LEN 65519 // 16384:65519
|
||||
#define TSDB_MAX_BINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
|
||||
#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
|
||||
#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
|
||||
|
||||
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
|
||||
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
|
||||
|
|
|
@ -2245,15 +2245,18 @@ static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataUpdateNothing(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
|
||||
return 0;
|
||||
}
|
||||
static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) = {
|
||||
{NULL, NULL, NULL}, // 0
|
||||
{tColDataUpdateValue10, NULL, tColDataUpdateValue12}, // HAS_NONE
|
||||
{tColDataUpdateValue20, NULL, NULL}, // HAS_NULL
|
||||
{tColDataUpdateValue30, NULL, tColDataUpdateValue32}, // HAS_NULL|HAS_NONE
|
||||
{tColDataUpdateValue40, NULL, tColDataUpdateValue42}, // HAS_VALUE
|
||||
{tColDataUpdateValue50, NULL, tColDataUpdateValue52}, // HAS_VALUE|HAS_NONE
|
||||
{tColDataUpdateValue60, NULL, tColDataUpdateValue62}, // HAS_VALUE|HAS_NULL
|
||||
{tColDataUpdateValue70, NULL, tColDataUpdateValue72}, // HAS_VALUE|HAS_NULL|HAS_NONE
|
||||
{NULL, NULL, NULL}, // 0
|
||||
{tColDataUpdateValue10, tColDataUpdateNothing, tColDataUpdateValue12}, // HAS_NONE
|
||||
{tColDataUpdateValue20, tColDataUpdateNothing, tColDataUpdateNothing}, // HAS_NULL
|
||||
{tColDataUpdateValue30, tColDataUpdateNothing, tColDataUpdateValue32}, // HAS_NULL|HAS_NONE
|
||||
{tColDataUpdateValue40, tColDataUpdateNothing, tColDataUpdateValue42}, // HAS_VALUE
|
||||
{tColDataUpdateValue50, tColDataUpdateNothing, tColDataUpdateValue52}, // HAS_VALUE|HAS_NONE
|
||||
{tColDataUpdateValue60, tColDataUpdateNothing, tColDataUpdateValue62}, // HAS_VALUE|HAS_NULL
|
||||
{tColDataUpdateValue70, tColDataUpdateNothing, tColDataUpdateValue72}, // HAS_VALUE|HAS_NULL|HAS_NONE
|
||||
|
||||
// VALUE NONE NULL
|
||||
};
|
||||
|
|
|
@ -202,7 +202,7 @@ int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItem
|
|||
// other
|
||||
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
|
||||
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
|
||||
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now);
|
||||
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec);
|
||||
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
|
||||
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
||||
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
||||
|
@ -707,7 +707,7 @@ typedef struct SSttBlockLoadInfo {
|
|||
|
||||
SArray *aSttBlk;
|
||||
SArray *pTombBlockArray; // tomb block array list
|
||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||
int32_t currentLoadBlockIndex;
|
||||
int32_t loadBlocks;
|
||||
double elapsedTime;
|
||||
|
|
|
@ -15,15 +15,9 @@
|
|||
|
||||
#include "tsdbDataFileRW.h"
|
||||
|
||||
typedef struct {
|
||||
SFDataPtr brinBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} SHeadFooter;
|
||||
|
||||
typedef struct {
|
||||
SFDataPtr tombBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} STombFooter;
|
||||
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
|
||||
extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||
|
||||
// SDataFileReader =============================================
|
||||
struct SDataFileReader {
|
||||
|
@ -644,81 +638,89 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
|
||||
if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0;
|
||||
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||
TBrinBlkArray *brinBlkArray, uint8_t **bufArr) {
|
||||
if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t code;
|
||||
|
||||
// get SBrinBlk
|
||||
SBrinBlk brinBlk[1] = {
|
||||
{
|
||||
.dp[0] =
|
||||
{
|
||||
.offset = writer->files[TSDB_FTYPE_HEAD].size,
|
||||
.offset = *fileSize,
|
||||
.size = 0,
|
||||
},
|
||||
.minTbid =
|
||||
{
|
||||
.suid = TARRAY2_FIRST(writer->brinBlock->suid),
|
||||
.uid = TARRAY2_FIRST(writer->brinBlock->uid),
|
||||
.suid = TARRAY2_FIRST(brinBlock->suid),
|
||||
.uid = TARRAY2_FIRST(brinBlock->uid),
|
||||
},
|
||||
.maxTbid =
|
||||
{
|
||||
.suid = TARRAY2_LAST(writer->brinBlock->suid),
|
||||
.uid = TARRAY2_LAST(writer->brinBlock->uid),
|
||||
.suid = TARRAY2_LAST(brinBlock->suid),
|
||||
.uid = TARRAY2_LAST(brinBlock->uid),
|
||||
},
|
||||
.minVer = TARRAY2_FIRST(writer->brinBlock->minVer),
|
||||
.maxVer = TARRAY2_FIRST(writer->brinBlock->minVer),
|
||||
.numRec = BRIN_BLOCK_SIZE(writer->brinBlock),
|
||||
.cmprAlg = writer->config->cmprAlg,
|
||||
.minVer = TARRAY2_FIRST(brinBlock->minVer),
|
||||
.maxVer = TARRAY2_FIRST(brinBlock->minVer),
|
||||
.numRec = BRIN_BLOCK_SIZE(brinBlock),
|
||||
.cmprAlg = cmprAlg,
|
||||
},
|
||||
};
|
||||
|
||||
for (int32_t i = 1; i < BRIN_BLOCK_SIZE(writer->brinBlock); i++) {
|
||||
if (brinBlk->minVer > TARRAY2_GET(writer->brinBlock->minVer, i)) {
|
||||
brinBlk->minVer = TARRAY2_GET(writer->brinBlock->minVer, i);
|
||||
for (int32_t i = 1; i < BRIN_BLOCK_SIZE(brinBlock); i++) {
|
||||
if (brinBlk->minVer > TARRAY2_GET(brinBlock->minVer, i)) {
|
||||
brinBlk->minVer = TARRAY2_GET(brinBlock->minVer, i);
|
||||
}
|
||||
if (brinBlk->maxVer < TARRAY2_GET(writer->brinBlock->maxVer, i)) {
|
||||
brinBlk->maxVer = TARRAY2_GET(writer->brinBlock->maxVer, i);
|
||||
if (brinBlk->maxVer < TARRAY2_GET(brinBlock->maxVer, i)) {
|
||||
brinBlk->maxVer = TARRAY2_GET(brinBlock->maxVer, i);
|
||||
}
|
||||
}
|
||||
|
||||
// write to file
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(writer->brinBlock->dataArr1); i++) {
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr1 + i),
|
||||
TARRAY2_DATA_LEN(writer->brinBlock->dataArr1 + i), TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg,
|
||||
&writer->config->bufArr[0], 0, &brinBlk->size[i], &writer->config->bufArr[1]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i),
|
||||
TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[i], &bufArr[1]);
|
||||
if (code) return code;
|
||||
|
||||
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0],
|
||||
brinBlk->size[i]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[i]);
|
||||
if (code) return code;
|
||||
|
||||
brinBlk->dp->size += brinBlk->size[i];
|
||||
writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[i];
|
||||
*fileSize += brinBlk->size[i];
|
||||
}
|
||||
|
||||
for (int32_t i = 0, j = ARRAY_SIZE(writer->brinBlock->dataArr1); i < ARRAY_SIZE(writer->brinBlock->dataArr2);
|
||||
i++, j++) {
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->brinBlock->dataArr2 + i),
|
||||
TARRAY2_DATA_LEN(writer->brinBlock->dataArr2 + i), TSDB_DATA_TYPE_INT, brinBlk->cmprAlg,
|
||||
&writer->config->bufArr[0], 0, &brinBlk->size[j], &writer->config->bufArr[1]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) {
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr2 + i), TARRAY2_DATA_LEN(brinBlock->dataArr2 + i),
|
||||
TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[j], &bufArr[1]);
|
||||
if (code) return code;
|
||||
|
||||
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_HEAD], writer->files[TSDB_FTYPE_HEAD].size, writer->config->bufArr[0],
|
||||
brinBlk->size[j]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[j]);
|
||||
if (code) return code;
|
||||
|
||||
brinBlk->dp->size += brinBlk->size[j];
|
||||
writer->files[TSDB_FTYPE_HEAD].size += brinBlk->size[j];
|
||||
*fileSize += brinBlk->size[j];
|
||||
}
|
||||
|
||||
// append to brinBlkArray
|
||||
code = TARRAY2_APPEND_PTR(writer->brinBlkArray, brinBlk);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
code = TARRAY2_APPEND_PTR(brinBlkArray, brinBlk);
|
||||
if (code) return code;
|
||||
|
||||
tBrinBlockClear(writer->brinBlock);
|
||||
tBrinBlockClear(brinBlock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
|
||||
if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbFileWriteBrinBlock(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlock, writer->config->cmprAlg,
|
||||
&writer->files[TSDB_FTYPE_HEAD].size, writer->brinBlkArray, writer->config->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -1131,15 +1133,19 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer) {
|
||||
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
|
||||
if (code) return code;
|
||||
*fileSize += sizeof(*footer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriteHeadFooter(SDataFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t ftype = TSDB_FTYPE_HEAD;
|
||||
code = tsdbWriteFile(writer->fd[ftype], writer->files[ftype].size, (const uint8_t *)writer->headFooter,
|
||||
sizeof(SHeadFooter));
|
||||
code = tsdbFileWriteHeadFooter(writer->fd[TSDB_FTYPE_HEAD], &writer->files[TSDB_FTYPE_HEAD].size, writer->headFooter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype].size += sizeof(SHeadFooter);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -1154,53 +1160,10 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STombBlk tombBlk[1] = {{
|
||||
.numRec = TOMB_BLOCK_SIZE(writer->tombBlock),
|
||||
.minTbid =
|
||||
{
|
||||
.suid = TARRAY2_FIRST(writer->tombBlock->suid),
|
||||
.uid = TARRAY2_FIRST(writer->tombBlock->uid),
|
||||
},
|
||||
.maxTbid =
|
||||
{
|
||||
.suid = TARRAY2_LAST(writer->tombBlock->suid),
|
||||
.uid = TARRAY2_LAST(writer->tombBlock->uid),
|
||||
},
|
||||
.minVer = TARRAY2_FIRST(writer->tombBlock->version),
|
||||
.maxVer = TARRAY2_FIRST(writer->tombBlock->version),
|
||||
.dp[0] =
|
||||
{
|
||||
.offset = writer->files[TSDB_FTYPE_TOMB].size,
|
||||
.size = 0,
|
||||
},
|
||||
}};
|
||||
|
||||
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) {
|
||||
tombBlk->minVer = TMIN(tombBlk->minVer, TARRAY2_GET(writer->tombBlock->version, i));
|
||||
tombBlk->maxVer = TMAX(tombBlk->maxVer, TARRAY2_GET(writer->tombBlock->version, i));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) {
|
||||
int32_t size;
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]),
|
||||
TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
|
||||
&writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size, writer->config->bufArr[0],
|
||||
size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tombBlk->size[i] = size;
|
||||
tombBlk->dp[0].size += size;
|
||||
writer->files[TSDB_FTYPE_TOMB].size += size;
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
|
||||
code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
|
||||
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tTombBlockClear(writer->tombBlock);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
|
@ -1214,14 +1177,9 @@ static int32_t tsdbDataFileDoWriteTombBlk(SDataFileWriter *writer) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t ftype = TSDB_FTYPE_TOMB;
|
||||
writer->tombFooter->tombBlkPtr->offset = writer->files[ftype].size;
|
||||
writer->tombFooter->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
|
||||
|
||||
code = tsdbWriteFile(writer->fd[ftype], writer->tombFooter->tombBlkPtr->offset,
|
||||
(const uint8_t *)TARRAY2_DATA(writer->tombBlkArray), writer->tombFooter->tombBlkPtr->size);
|
||||
code = tsdbFileWriteTombBlk(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlkArray, writer->tombFooter->tombBlkPtr,
|
||||
&writer->files[TSDB_FTYPE_TOMB].size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype].size += writer->tombFooter->tombBlkPtr->size;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -1230,14 +1188,19 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize) {
|
||||
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
|
||||
if (code) return code;
|
||||
*fileSize += sizeof(*footer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriteTombFooter(SDataFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_TOMB], writer->files[TSDB_FTYPE_TOMB].size,
|
||||
(const uint8_t *)writer->tombFooter, sizeof(STombFooter));
|
||||
code = tsdbFileWriteTombFooter(writer->fd[TSDB_FTYPE_TOMB], writer->tombFooter, &writer->files[TSDB_FTYPE_TOMB].size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[TSDB_FTYPE_TOMB].size += sizeof(STombFooter);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -1306,20 +1269,25 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
|
||||
ASSERT(TARRAY2_SIZE(writer->brinBlkArray) > 0);
|
||||
int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
|
||||
ASSERT(TARRAY2_SIZE(brinBlkArray) > 0);
|
||||
ptr->offset = *fileSize;
|
||||
ptr->size = TARRAY2_DATA_LEN(brinBlkArray);
|
||||
|
||||
int32_t code = tsdbWriteFile(fd, ptr->offset, (uint8_t *)TARRAY2_DATA(brinBlkArray), ptr->size);
|
||||
if (code) return code;
|
||||
|
||||
*fileSize += ptr->size;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileWriteBrinBlk(SDataFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t ftype = TSDB_FTYPE_HEAD;
|
||||
writer->headFooter->brinBlkPtr->offset = writer->files[ftype].size;
|
||||
writer->headFooter->brinBlkPtr->size = TARRAY2_DATA_LEN(writer->brinBlkArray);
|
||||
|
||||
code = tsdbWriteFile(writer->fd[ftype], writer->headFooter->brinBlkPtr->offset,
|
||||
(uint8_t *)TARRAY2_DATA(writer->brinBlkArray), writer->headFooter->brinBlkPtr->size);
|
||||
code = tsdbFileWriteBrinBlk(writer->fd[TSDB_FTYPE_HEAD], writer->brinBlkArray, writer->headFooter->brinBlkPtr,
|
||||
&writer->files[TSDB_FTYPE_HEAD].size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->files[ftype].size += writer->headFooter->brinBlkPtr->size;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
|
|
@ -29,6 +29,16 @@ typedef TARRAY2(SBlockIdx) TBlockIdxArray;
|
|||
typedef TARRAY2(SDataBlk) TDataBlkArray;
|
||||
typedef TARRAY2(SColumnDataAgg) TColumnDataAggArray;
|
||||
|
||||
typedef struct {
|
||||
SFDataPtr brinBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} SHeadFooter;
|
||||
|
||||
typedef struct {
|
||||
SFDataPtr tombBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} STombFooter;
|
||||
|
||||
// SDataFileReader =============================================
|
||||
typedef struct SDataFileReader SDataFileReader;
|
||||
typedef struct SDataFileReaderConfig {
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#include "tsdbFSet2.h"
|
||||
|
||||
static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
||||
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
||||
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
lvl[0]->level = level;
|
||||
TARRAY2_INIT(lvl[0]->fobjArr);
|
||||
|
|
|
@ -629,6 +629,12 @@ static int32_t tsdbIterSkipTableData(STsdbIter *iter, const TABLEID *tbid) {
|
|||
return tsdbDataIterNext(iter, tbid);
|
||||
case TSDB_ITER_TYPE_MEMT:
|
||||
return tsdbMemTableIterNext(iter, tbid);
|
||||
case TSDB_ITER_TYPE_STT_TOMB:
|
||||
return tsdbSttTombIterNext(iter, tbid);
|
||||
case TSDB_ITER_TYPE_DATA_TOMB:
|
||||
return tsdbDataTombIterNext(iter, tbid);
|
||||
case TSDB_ITER_TYPE_MEMT_TOMB:
|
||||
return tsdbMemTombIterNext(iter, tbid);
|
||||
default:
|
||||
ASSERT(false);
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ typedef struct {
|
|||
} SMerger;
|
||||
|
||||
static int32_t tsdbMergerOpen(SMerger *merger) {
|
||||
merger->ctx->now = taosGetTimestampMs();
|
||||
merger->ctx->now = taosGetTimestampSec();
|
||||
merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
|
||||
merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
|
||||
merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
|
||||
|
|
|
@ -23,7 +23,7 @@ typedef struct {
|
|||
int64_t cid;
|
||||
|
||||
TFileSetArray *fsetArr;
|
||||
TFileOpArray *fopArr;
|
||||
TFileOpArray fopArr[1];
|
||||
|
||||
struct {
|
||||
int32_t fsetArrIdx;
|
||||
|
|
|
@ -766,7 +766,7 @@ static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
|
|||
writer->ctx->fid = fid;
|
||||
writer->ctx->fset = TARRAY2_SEARCH_EX(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||
|
||||
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, writer->now);
|
||||
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
||||
if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) {
|
||||
code = TSDB_CODE_NO_AVAIL_DISK;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -15,13 +15,6 @@
|
|||
|
||||
#include "tsdbSttFileRW.h"
|
||||
|
||||
typedef struct {
|
||||
SFDataPtr sttBlkPtr[1];
|
||||
SFDataPtr statisBlkPtr[1];
|
||||
SFDataPtr tombBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} SSttFooter;
|
||||
|
||||
// SSttFReader ============================================================
|
||||
struct SSttFileReader {
|
||||
SSttFileReaderConfig config[1];
|
||||
|
@ -402,51 +395,64 @@ struct SSttFileWriter {
|
|||
uint8_t *bufArr[5];
|
||||
};
|
||||
|
||||
int32_t tsdbFileDoWriteBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
|
||||
TSttBlkArray *sttBlkArray, uint8_t **bufArr) {
|
||||
if (blockData->nRow == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
SSttBlk sttBlk[1] = {{
|
||||
.suid = blockData->suid,
|
||||
.minUid = blockData->uid ? blockData->uid : blockData->aUid[0],
|
||||
.maxUid = blockData->uid ? blockData->uid : blockData->aUid[blockData->nRow - 1],
|
||||
.minKey = blockData->aTSKEY[0],
|
||||
.maxKey = blockData->aTSKEY[0],
|
||||
.minVer = blockData->aVersion[0],
|
||||
.maxVer = blockData->aVersion[0],
|
||||
.nRow = blockData->nRow,
|
||||
}};
|
||||
|
||||
for (int32_t iRow = 1; iRow < blockData->nRow; iRow++) {
|
||||
if (sttBlk->minKey > blockData->aTSKEY[iRow]) sttBlk->minKey = blockData->aTSKEY[iRow];
|
||||
if (sttBlk->maxKey < blockData->aTSKEY[iRow]) sttBlk->maxKey = blockData->aTSKEY[iRow];
|
||||
if (sttBlk->minVer > blockData->aVersion[iRow]) sttBlk->minVer = blockData->aVersion[iRow];
|
||||
if (sttBlk->maxVer < blockData->aVersion[iRow]) sttBlk->maxVer = blockData->aVersion[iRow];
|
||||
}
|
||||
|
||||
int32_t sizeArr[5] = {0};
|
||||
code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr);
|
||||
if (code) return code;
|
||||
|
||||
sttBlk->bInfo.offset = *fileSize;
|
||||
sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
|
||||
sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
|
||||
|
||||
for (int32_t i = 3; i >= 0; i--) {
|
||||
if (sizeArr[i]) {
|
||||
code = tsdbWriteFile(fd, *fileSize, bufArr[i], sizeArr[i]);
|
||||
if (code) return code;
|
||||
*fileSize += sizeArr[i];
|
||||
}
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND_PTR(sttBlkArray, sttBlk);
|
||||
if (code) return code;
|
||||
|
||||
tBlockDataClear(blockData);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
|
||||
if (writer->blockData->nRow == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SSttBlk sttBlk[1] = {{
|
||||
.suid = writer->blockData->suid,
|
||||
.minUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[0],
|
||||
.maxUid = writer->blockData->uid ? writer->blockData->uid : writer->blockData->aUid[writer->blockData->nRow - 1],
|
||||
.minKey = writer->blockData->aTSKEY[0],
|
||||
.maxKey = writer->blockData->aTSKEY[0],
|
||||
.minVer = writer->blockData->aVersion[0],
|
||||
.maxVer = writer->blockData->aVersion[0],
|
||||
.nRow = writer->blockData->nRow,
|
||||
}};
|
||||
|
||||
for (int32_t iRow = 1; iRow < writer->blockData->nRow; iRow++) {
|
||||
if (sttBlk->minKey > writer->blockData->aTSKEY[iRow]) sttBlk->minKey = writer->blockData->aTSKEY[iRow];
|
||||
if (sttBlk->maxKey < writer->blockData->aTSKEY[iRow]) sttBlk->maxKey = writer->blockData->aTSKEY[iRow];
|
||||
if (sttBlk->minVer > writer->blockData->aVersion[iRow]) sttBlk->minVer = writer->blockData->aVersion[iRow];
|
||||
if (sttBlk->maxVer < writer->blockData->aVersion[iRow]) sttBlk->maxVer = writer->blockData->aVersion[iRow];
|
||||
}
|
||||
|
||||
int32_t sizeArr[5] = {0};
|
||||
code = tCmprBlockData(writer->blockData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr);
|
||||
code = tsdbFileDoWriteBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
|
||||
writer->sttBlkArray, writer->config->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
sttBlk->bInfo.offset = writer->file->size;
|
||||
sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
|
||||
sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
|
||||
|
||||
for (int32_t i = 3; i >= 0; i--) {
|
||||
if (sizeArr[i]) {
|
||||
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->file->size += sizeArr[i];
|
||||
}
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tBlockDataClear(writer->blockData);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
|
@ -516,61 +522,72 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||
TTombBlkArray *tombBlkArray, uint8_t **bufArr) {
|
||||
int32_t code;
|
||||
|
||||
if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
|
||||
|
||||
STombBlk tombBlk[1] = {{
|
||||
.dp[0] =
|
||||
{
|
||||
.offset = *fileSize,
|
||||
.size = 0,
|
||||
},
|
||||
.minTbid =
|
||||
{
|
||||
.suid = TARRAY2_FIRST(tombBlock->suid),
|
||||
.uid = TARRAY2_FIRST(tombBlock->uid),
|
||||
},
|
||||
.maxTbid =
|
||||
{
|
||||
.suid = TARRAY2_LAST(tombBlock->suid),
|
||||
.uid = TARRAY2_LAST(tombBlock->uid),
|
||||
},
|
||||
.minVer = TARRAY2_FIRST(tombBlock->version),
|
||||
.maxVer = TARRAY2_FIRST(tombBlock->version),
|
||||
.numRec = TOMB_BLOCK_SIZE(tombBlock),
|
||||
.cmprAlg = cmprAlg,
|
||||
}};
|
||||
|
||||
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
|
||||
if (tombBlk->minVer > TARRAY2_GET(tombBlock->version, i)) {
|
||||
tombBlk->minVer = TARRAY2_GET(tombBlock->version, i);
|
||||
}
|
||||
if (tombBlk->maxVer < TARRAY2_GET(tombBlock->version, i)) {
|
||||
tombBlk->maxVer = TARRAY2_GET(tombBlock->version, i);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) {
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]),
|
||||
TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]);
|
||||
if (code) return code;
|
||||
|
||||
code = tsdbWriteFile(fd, *fileSize, bufArr[0], tombBlk->size[i]);
|
||||
if (code) return code;
|
||||
|
||||
tombBlk->dp->size += tombBlk->size[i];
|
||||
*fileSize += tombBlk->size[i];
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND_PTR(tombBlkArray, tombBlk);
|
||||
if (code) return code;
|
||||
|
||||
tTombBlockClear(tombBlock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
|
||||
if (TOMB_BLOCK_SIZE(writer->tombBlock) == 0) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STombBlk tombBlk[1] = {{
|
||||
.dp[0] =
|
||||
{
|
||||
.offset = writer->file->size,
|
||||
.size = 0,
|
||||
},
|
||||
.minTbid =
|
||||
{
|
||||
.suid = TARRAY2_FIRST(writer->tombBlock->suid),
|
||||
.uid = TARRAY2_FIRST(writer->tombBlock->uid),
|
||||
},
|
||||
.maxTbid =
|
||||
{
|
||||
.suid = TARRAY2_LAST(writer->tombBlock->suid),
|
||||
.uid = TARRAY2_LAST(writer->tombBlock->uid),
|
||||
},
|
||||
.minVer = TARRAY2_FIRST(writer->tombBlock->version),
|
||||
.maxVer = TARRAY2_FIRST(writer->tombBlock->version),
|
||||
.numRec = TOMB_BLOCK_SIZE(writer->tombBlock),
|
||||
.cmprAlg = writer->config->cmprAlg,
|
||||
}};
|
||||
|
||||
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tombBlock); i++) {
|
||||
if (tombBlk->minVer > TARRAY2_GET(writer->tombBlock->version, i)) {
|
||||
tombBlk->minVer = TARRAY2_GET(writer->tombBlock->version, i);
|
||||
}
|
||||
if (tombBlk->maxVer < TARRAY2_GET(writer->tombBlock->version, i)) {
|
||||
tombBlk->maxVer = TARRAY2_GET(writer->tombBlock->version, i);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(writer->tombBlock->dataArr); i++) {
|
||||
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tombBlock->dataArr[i]),
|
||||
TARRAY2_DATA_LEN(&writer->tombBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
|
||||
&writer->config->bufArr[0], 0, &tombBlk->size[i], &writer->config->bufArr[1]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], tombBlk->size[i]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tombBlk->dp->size += tombBlk->size[i];
|
||||
writer->file->size += tombBlk->size[i];
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND_PTR(writer->tombBlkArray, tombBlk);
|
||||
code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
|
||||
writer->tombBlkArray, writer->config->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tTombBlockClear(writer->tombBlock);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
|
@ -578,18 +595,27 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
|
||||
ptr->size = TARRAY2_DATA_LEN(sttBlkArray);
|
||||
if (ptr->size > 0) {
|
||||
ptr->offset = *fileSize;
|
||||
|
||||
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(sttBlkArray), ptr->size);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
*fileSize += ptr->size;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
|
||||
if (writer->footer->sttBlkPtr->size) {
|
||||
writer->footer->sttBlkPtr->offset = writer->file->size;
|
||||
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
|
||||
writer->footer->sttBlkPtr->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->file->size += writer->footer->sttBlkPtr->size;
|
||||
}
|
||||
code = tsdbFileWriteSttBlk(writer->fd, writer->sttBlkArray, writer->footer->sttBlkPtr, &writer->file->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -618,18 +644,27 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize) {
|
||||
ptr->size = TARRAY2_DATA_LEN(tombBlkArray);
|
||||
if (ptr->size > 0) {
|
||||
ptr->offset = *fileSize;
|
||||
|
||||
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)TARRAY2_DATA(tombBlkArray), ptr->size);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
*fileSize += ptr->size;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
|
||||
if (writer->footer->tombBlkPtr->size) {
|
||||
writer->footer->tombBlkPtr->offset = writer->file->size;
|
||||
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
|
||||
writer->footer->tombBlkPtr->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
writer->file->size += writer->footer->tombBlkPtr->size;
|
||||
}
|
||||
code = tsdbFileWriteTombBlk(writer->fd, writer->tombBlkArray, writer->footer->tombBlkPtr, &writer->file->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -638,13 +673,17 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
|
||||
int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer));
|
||||
int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize) {
|
||||
int32_t code = tsdbWriteFile(fd, *fileSize, (const uint8_t *)footer, sizeof(*footer));
|
||||
if (code) return code;
|
||||
writer->file->size += sizeof(writer->footer);
|
||||
*fileSize += sizeof(*footer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
|
||||
return tsdbFileWriteSttFooter(writer->fd, writer->footer, &writer->file->size);
|
||||
}
|
||||
|
||||
static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
|
|
@ -26,6 +26,13 @@ extern "C" {
|
|||
typedef TARRAY2(SSttBlk) TSttBlkArray;
|
||||
typedef TARRAY2(SStatisBlk) TStatisBlkArray;
|
||||
|
||||
typedef struct {
|
||||
SFDataPtr sttBlkPtr[1];
|
||||
SFDataPtr statisBlkPtr[1];
|
||||
SFDataPtr tombBlkPtr[1];
|
||||
SFDataPtr rsrvd[2];
|
||||
} SSttFooter;
|
||||
|
||||
// SSttFileReader ==========================================
|
||||
typedef struct SSttFileReader SSttFileReader;
|
||||
typedef struct SSttFileReaderConfig SSttFileReaderConfig;
|
||||
|
|
|
@ -16,106 +16,404 @@
|
|||
#include "tsdbUpgrade.h"
|
||||
|
||||
// old
|
||||
extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
||||
extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
|
||||
extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData);
|
||||
|
||||
// new
|
||||
extern int32_t save_fs(const TFileSetArray *arr, const char *fname);
|
||||
extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
|
||||
extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||
TBrinBlkArray *brinBlkArray, uint8_t **bufArr);
|
||||
extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||
extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer);
|
||||
extern int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
|
||||
extern int32_t tsdbFileWriteSttBlk(STsdbFD *fd, const TSttBlkArray *sttBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||
extern int32_t tsdbFileWriteSttFooter(STsdbFD *fd, const SSttFooter *footer, int64_t *fileSize);
|
||||
extern int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
|
||||
TTombBlkArray *tombBlkArray, uint8_t **bufArr);
|
||||
extern int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
|
||||
extern int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize);
|
||||
|
||||
static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
struct {
|
||||
// config
|
||||
int32_t maxRow;
|
||||
int8_t cmprAlg;
|
||||
int32_t szPage;
|
||||
uint8_t *bufArr[8];
|
||||
// reader
|
||||
SArray *aBlockIdx;
|
||||
SMapData mDataBlk[1];
|
||||
SBlockData blockData[1];
|
||||
// writer
|
||||
STsdbFD *fd;
|
||||
SBrinBlock brinBlock[1];
|
||||
TBrinBlkArray brinBlkArray[1];
|
||||
SHeadFooter footer[1];
|
||||
} ctx[1] = {{
|
||||
.maxRow = tsdb->pVnode->config.tsdbCfg.maxRows,
|
||||
.cmprAlg = tsdb->pVnode->config.tsdbCfg.compression,
|
||||
.szPage = tsdb->pVnode->config.tsdbPageSize,
|
||||
}};
|
||||
|
||||
if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbReadBlockIdx(reader, ctx->aBlockIdx);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosArrayGetSize(ctx->aBlockIdx) == 0) {
|
||||
goto _exit;
|
||||
} else {
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_HEAD,
|
||||
.did = pDFileSet->diskId,
|
||||
.fid = fset->fid,
|
||||
.cid = pDFileSet->pHeadF->commitID,
|
||||
.size = pDFileSet->pHeadF->size,
|
||||
};
|
||||
|
||||
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_HEAD]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// open fd
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
tsdbTFileName(tsdb, &file, fname);
|
||||
|
||||
code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) {
|
||||
SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx);
|
||||
|
||||
code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) {
|
||||
SDataBlk dataBlk[1];
|
||||
tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk);
|
||||
|
||||
SBrinRecord record = {
|
||||
.suid = pBlockIdx->suid,
|
||||
.uid = pBlockIdx->uid,
|
||||
.firstKey = dataBlk->minKey.ts,
|
||||
.firstKeyVer = dataBlk->minKey.version,
|
||||
.lastKey = dataBlk->maxKey.ts,
|
||||
.lastKeyVer = dataBlk->maxKey.version,
|
||||
.minVer = dataBlk->minVer,
|
||||
.maxVer = dataBlk->maxVer,
|
||||
.blockOffset = dataBlk->aSubBlock->offset,
|
||||
.smaOffset = dataBlk->smaInfo.offset,
|
||||
.blockSize = dataBlk->aSubBlock->szBlock,
|
||||
.blockKeySize = dataBlk->aSubBlock->szKey,
|
||||
.smaSize = dataBlk->smaInfo.size,
|
||||
.numRow = dataBlk->nRow,
|
||||
.count = dataBlk->nRow,
|
||||
};
|
||||
|
||||
if (dataBlk->hasDup) {
|
||||
code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
record.count = 1;
|
||||
for (int32_t i = 1; i < ctx->blockData->nRow; ++i) {
|
||||
if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) {
|
||||
record.count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
code = tBrinBlockPut(ctx->brinBlock, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) {
|
||||
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||
ctx->brinBlkArray, ctx->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) {
|
||||
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||
ctx->brinBlkArray, ctx->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code =
|
||||
tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr, &fset->farr[TSDB_FTYPE_HEAD]->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFsyncFile(ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tsdbCloseFile(&ctx->fd);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
TARRAY2_DESTROY(ctx->brinBlkArray, NULL);
|
||||
tBrinBlockDestroy(ctx->brinBlock);
|
||||
tBlockDataDestroy(ctx->blockData);
|
||||
tMapDataClear(ctx->mDataBlk);
|
||||
taosArrayDestroy(ctx->aBlockIdx);
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); ++i) {
|
||||
tFree(ctx->bufArr[i]);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (fset->farr[TSDB_FTYPE_HEAD] == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_DATA,
|
||||
.did = pDFileSet->diskId,
|
||||
.fid = fset->fid,
|
||||
.cid = pDFileSet->pDataF->commitID,
|
||||
.size = pDFileSet->pDataF->size,
|
||||
};
|
||||
|
||||
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_DATA]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (fset->farr[TSDB_FTYPE_HEAD] == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_SMA,
|
||||
.did = pDFileSet->diskId,
|
||||
.fid = fset->fid,
|
||||
.cid = pDFileSet->pSmaF->commitID,
|
||||
.size = pDFileSet->pSmaF->size,
|
||||
};
|
||||
|
||||
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_SMA]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset,
|
||||
int32_t iStt, SSttLvl *lvl) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SArray *aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||
if (aSttBlk == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbReadSttBlk(reader, iStt, aSttBlk);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosArrayGetSize(aSttBlk) > 0) {
|
||||
SSttFile *pSttF = pDFileSet->aSttF[iStt];
|
||||
STFileObj *fobj;
|
||||
struct {
|
||||
int32_t szPage;
|
||||
// writer
|
||||
STsdbFD *fd;
|
||||
TSttBlkArray sttBlkArray[1];
|
||||
SSttFooter footer[1];
|
||||
} ctx[1] = {{
|
||||
.szPage = tsdb->pVnode->config.tsdbPageSize,
|
||||
}};
|
||||
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_STT,
|
||||
.did = pDFileSet->diskId,
|
||||
.fid = fset->fid,
|
||||
.cid = pSttF->commitID,
|
||||
.size = pSttF->size,
|
||||
};
|
||||
code = tsdbTFileObjInit(tsdb, &file, &fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
code = tsdbOpenFile(fobj->fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
for (int32_t iSttBlk = 0; iSttBlk < taosArrayGetSize(aSttBlk); iSttBlk++) {
|
||||
code = TARRAY2_APPEND_PTR(ctx->sttBlkArray, (SSttBlk *)taosArrayGet(aSttBlk, iSttBlk));
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
}
|
||||
|
||||
code = tsdbFileWriteSttBlk(ctx->fd, ctx->sttBlkArray, ctx->footer->sttBlkPtr, &fobj->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
code = tsdbFileWriteSttFooter(ctx->fd, ctx->footer, &fobj->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
code = tsdbFsyncFile(ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
tsdbCloseFile(&ctx->fd);
|
||||
|
||||
code = TARRAY2_APPEND(lvl->fobjArr, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
_exit1:
|
||||
TARRAY2_DESTROY(ctx->sttBlkArray, NULL);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
taosArrayDestroy(aSttBlk);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (pDFileSet->nSttF == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSttLvl *lvl;
|
||||
code = tsdbSttLvlInit(0, &lvl);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t iStt = 0; iStt < pDFileSet->nSttF; ++iStt) {
|
||||
code = tsdbUpgradeSttFile(tsdb, pDFileSet, reader, fset, iStt, lvl);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = TARRAY2_APPEND(fset->lvlArr, lvl);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SDataFReader *reader;
|
||||
STFileSet *fset;
|
||||
|
||||
code = tsdbTFileSetInit(pDFileSet->fid, &fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// .head
|
||||
{
|
||||
SArray *aBlockIdx = NULL;
|
||||
SMapData mDataBlk[1] = {0};
|
||||
SBrinBlock brinBlock[1] = {0};
|
||||
TBrinBlkArray brinBlkArray[1] = {0};
|
||||
|
||||
if ((aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbReadBlockIdx(reader, aBlockIdx);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
|
||||
SBlockIdx *pBlockIdx = taosArrayGet(aBlockIdx, i);
|
||||
|
||||
code = tsdbReadDataBlk(reader, pBlockIdx, mDataBlk);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t j = 0; j < mDataBlk->nItem; ++j) {
|
||||
SDataBlk dataBlk[1];
|
||||
|
||||
tMapDataGetItemByIdx(mDataBlk, j, dataBlk, tGetDataBlk);
|
||||
|
||||
SBrinRecord record = {
|
||||
.suid = pBlockIdx->suid,
|
||||
.uid = pBlockIdx->uid,
|
||||
.firstKey = dataBlk->minKey.ts,
|
||||
.firstKeyVer = dataBlk->minKey.version,
|
||||
.lastKey = dataBlk->maxKey.ts,
|
||||
.lastKeyVer = dataBlk->maxKey.version,
|
||||
.minVer = dataBlk->minVer,
|
||||
.maxVer = dataBlk->maxVer,
|
||||
.blockOffset = dataBlk->aSubBlock->offset,
|
||||
.smaOffset = dataBlk->smaInfo.offset,
|
||||
.blockSize = dataBlk->aSubBlock->szBlock,
|
||||
.blockKeySize = dataBlk->aSubBlock->szKey,
|
||||
.smaSize = dataBlk->smaInfo.size,
|
||||
.numRow = dataBlk->nRow,
|
||||
.count = dataBlk->nRow,
|
||||
};
|
||||
|
||||
if (dataBlk->hasDup) {
|
||||
ASSERT(0);
|
||||
// TODO: need to get count
|
||||
// record.count = 0;
|
||||
}
|
||||
|
||||
code = tBrinBlockPut(brinBlock, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (BRIN_BLOCK_SIZE(brinBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
|
||||
// TODO
|
||||
tBrinBlockClear(brinBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (BRIN_BLOCK_SIZE(brinBlock) > 0) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
|
||||
TARRAY2_DESTROY(brinBlkArray, NULL);
|
||||
tBrinBlockDestroy(brinBlock);
|
||||
taosArrayDestroy(aBlockIdx);
|
||||
tMapDataClear(mDataBlk);
|
||||
}
|
||||
code = tsdbUpgradeHead(tsdb, pDFileSet, reader, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// .data
|
||||
code = tsdbUpgradeData(tsdb, pDFileSet, reader, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// .sma
|
||||
code = tsdbUpgradeSma(tsdb, pDFileSet, reader, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// .stt
|
||||
for (int32_t i = 0; i < pDFileSet->nSttF; ++i) {
|
||||
// TODO
|
||||
if (pDFileSet->nSttF > 0) {
|
||||
code = tsdbUpgradeStt(tsdb, pDFileSet, reader, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tsdbDataFReaderClose(&reader);
|
||||
|
||||
code = TARRAY2_APPEND(fileSetArray, fset);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **fd, STFileObj **fobj, bool *toStt) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (TARRAY2_SIZE(fset->lvlArr) == 0) { // to .tomb file
|
||||
*toStt = false;
|
||||
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_TOMB,
|
||||
.did = fset->farr[TSDB_FTYPE_HEAD]->f->did,
|
||||
.fid = fset->fid,
|
||||
.cid = 0,
|
||||
.size = 0,
|
||||
};
|
||||
|
||||
code = tsdbTFileObjInit(tsdb, &file, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
fset->farr[TSDB_FTYPE_TOMB] = *fobj;
|
||||
} else { // to .stt file
|
||||
*toStt = true;
|
||||
SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, 0);
|
||||
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_STT,
|
||||
.did = TARRAY2_GET(lvl->fobjArr, 0)->f->did,
|
||||
.fid = fset->fid,
|
||||
.cid = 0,
|
||||
.size = 0,
|
||||
};
|
||||
|
||||
code = tsdbTFileObjInit(tsdb, &file, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = TARRAY2_APPEND(lvl->fobjArr, fobj[0]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
code = tsdbOpenFile(fobj[0]->fname, tsdb->pVnode->config.tsdbPageSize,
|
||||
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_CREATE, fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||
code = tsdbWriteFile(fd[0], 0, hdr, TSDB_FHDR_SIZE);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
fobj[0]->f->size += TSDB_FHDR_SIZE;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
|
@ -127,31 +425,43 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SArray *aDelData = NULL;
|
||||
int64_t minKey, maxKey;
|
||||
STombBlock tombBlock[1] = {0};
|
||||
TTombBlkArray tombBlkArray[1] = {0};
|
||||
STsdbFD *fd = NULL;
|
||||
struct {
|
||||
// context
|
||||
bool toStt;
|
||||
int8_t cmprAlg;
|
||||
int32_t maxRow;
|
||||
int64_t minKey;
|
||||
int64_t maxKey;
|
||||
uint8_t *bufArr[8];
|
||||
// reader
|
||||
SArray *aDelData;
|
||||
// writer
|
||||
STsdbFD *fd;
|
||||
STFileObj *fobj;
|
||||
STombBlock tombBlock[1];
|
||||
TTombBlkArray tombBlkArray[1];
|
||||
STombFooter tombFooter[1];
|
||||
SSttFooter sttFooter[1];
|
||||
} ctx[1] = {{
|
||||
.maxRow = tsdb->pVnode->config.tsdbCfg.maxRows,
|
||||
.cmprAlg = tsdb->pVnode->config.tsdbCfg.compression,
|
||||
}};
|
||||
|
||||
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
|
||||
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &ctx->minKey, &ctx->maxKey);
|
||||
|
||||
if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
|
||||
if ((ctx->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) {
|
||||
SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i);
|
||||
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
|
||||
SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(aDelIdx, iDelIdx);
|
||||
|
||||
code = tsdbReadDelData(reader, pDelIdx, aDelData);
|
||||
code = tsdbReadDelData(reader, pDelIdx, ctx->aDelData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) {
|
||||
SDelData *pDelData = taosArrayGet(aDelData, j);
|
||||
|
||||
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
|
||||
continue;
|
||||
}
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(ctx->aDelData); iDelData++) {
|
||||
SDelData *pDelData = (SDelData *)taosArrayGet(ctx->aDelData, iDelData);
|
||||
|
||||
STombRecord record = {
|
||||
.suid = pDelIdx->suid,
|
||||
|
@ -161,60 +471,62 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
|
|||
.ekey = pDelData->eKey,
|
||||
};
|
||||
|
||||
code = tTombBlockPut(tombBlock, &record);
|
||||
code = tTombBlockPut(ctx->tombBlock, &record);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
|
||||
if (fd == NULL) {
|
||||
STFile file = {
|
||||
.type = TSDB_FTYPE_TOMB,
|
||||
.did = {0}, // TODO
|
||||
.fid = fset->fid,
|
||||
.cid = 0, // TODO
|
||||
};
|
||||
|
||||
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]);
|
||||
if (TOMB_BLOCK_SIZE(ctx->tombBlock) > ctx->maxRow) {
|
||||
if (ctx->fd == NULL) {
|
||||
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize,
|
||||
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||
code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr);
|
||||
}
|
||||
|
||||
// TODO
|
||||
tTombBlockClear(tombBlock);
|
||||
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
|
||||
ctx->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (TOMB_BLOCK_SIZE(tombBlock) > 0) {
|
||||
// TODO
|
||||
tTombBlockClear(tombBlock);
|
||||
if (TOMB_BLOCK_SIZE(ctx->tombBlock) > 0) {
|
||||
if (ctx->fd == NULL) {
|
||||
code = tsdbUpgradeOpenTombFile(tsdb, fset, &ctx->fd, &ctx->fobj, &ctx->toStt);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
|
||||
ctx->bufArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (TARRAY2_SIZE(tombBlkArray) > 0) {
|
||||
// TODO
|
||||
}
|
||||
if (ctx->fd != NULL) {
|
||||
if (ctx->toStt) {
|
||||
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (fd) {
|
||||
code = tsdbFsyncFile(fd);
|
||||
code = tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbFsyncFile(ctx->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tsdbCloseFile(&fd);
|
||||
tsdbCloseFile(&ctx->fd);
|
||||
}
|
||||
TARRAY2_DESTROY(tombBlkArray, NULL);
|
||||
tTombBlockDestroy(tombBlock);
|
||||
taosArrayDestroy(aDelData);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); i++) {
|
||||
tFree(ctx->bufArr[i]);
|
||||
}
|
||||
TARRAY2_DESTROY(ctx->tombBlkArray, NULL);
|
||||
tTombBlockDestroy(ctx->tombBlock);
|
||||
taosArrayDestroy(ctx->aDelData);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -244,8 +556,30 @@ static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArra
|
|||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
tsdbDelFReaderClose(&reader);
|
||||
taosArrayDestroy(aDelIdx);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, TFileSetArray *fileSetArray) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// upgrade each file set
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) {
|
||||
code = tsdbUpgradeFileSet(tsdb, taosArrayGet(tsdb->fs.aDFileSet, i), fileSetArray);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// upgrade tomb file
|
||||
if (tsdb->fs.pDelFile != NULL) {
|
||||
code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -254,44 +588,34 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||
static int32_t tsdbUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
TFileSetArray fileSetArray[1] = {0};
|
||||
|
||||
// load old file system and convert
|
||||
// open old file system
|
||||
code = tsdbFSOpen(tsdb, rollback);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) {
|
||||
SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i);
|
||||
|
||||
code = tsdbUpgradeFileSet(tsdb, pDFileSet, fileSetArray);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (tsdb->fs.pDelFile != NULL) {
|
||||
code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
code = tsdbDoUpgradeFileSystem(tsdb, fileSetArray);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// close file system
|
||||
code = tsdbFSClose(tsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// save new file system
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
current_fname(tsdb, fname, TSDB_FCURRENT);
|
||||
|
||||
code = save_fs(fileSetArray, fname);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -301,7 +625,7 @@ int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
|||
tsdbGetCurrentFName(tsdb, fname, NULL);
|
||||
if (!taosCheckExistFile(fname)) return 0;
|
||||
|
||||
int32_t code = tsdbDoUpgradeFileSystem(tsdb, rollback);
|
||||
int32_t code = tsdbUpgradeFileSystem(tsdb, rollback);
|
||||
if (code) return code;
|
||||
|
||||
taosRemoveFile(fname);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbDef.h"
|
||||
#include "tsdbFS2.h"
|
||||
#include "tsdbUtil2.h"
|
||||
|
|
|
@ -49,7 +49,7 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
|||
.hashBegin = 0,
|
||||
.hashEnd = 0,
|
||||
.hashMethod = 0,
|
||||
.sttTrigger = TSDB_DEFAULT_STT_FILE,
|
||||
.sttTrigger = TSDB_DEFAULT_SST_TRIGGER,
|
||||
.tsdbPageSize = TSDB_DEFAULT_PAGE_SIZE};
|
||||
|
||||
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
||||
|
@ -57,7 +57,7 @@ int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
const char* vnodeRoleToStr(ESyncRole role) {
|
||||
const char *vnodeRoleToStr(ESyncRole role) {
|
||||
switch (role) {
|
||||
case TAOS_SYNC_ROLE_VOTER:
|
||||
return "true";
|
||||
|
@ -68,11 +68,11 @@ const char* vnodeRoleToStr(ESyncRole role) {
|
|||
}
|
||||
}
|
||||
|
||||
const ESyncRole vnodeStrToRole(char* str) {
|
||||
if(strcmp(str, "true") == 0){
|
||||
const ESyncRole vnodeStrToRole(char *str) {
|
||||
if (strcmp(str, "true") == 0) {
|
||||
return TAOS_SYNC_ROLE_VOTER;
|
||||
}
|
||||
if(strcmp(str, "false") == 0){
|
||||
if (strcmp(str, "false") == 0) {
|
||||
return TAOS_SYNC_ROLE_LEARNER;
|
||||
}
|
||||
|
||||
|
@ -295,10 +295,9 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
char role[10] = {0};
|
||||
code = tjsonGetStringValue(info, "isReplica", role);
|
||||
if (code < 0) return -1;
|
||||
if(strlen(role) != 0){
|
||||
if (strlen(role) != 0) {
|
||||
pNode->nodeRole = vnodeStrToRole(role);
|
||||
}
|
||||
else{
|
||||
} else {
|
||||
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
}
|
||||
vDebug("vgId:%d, decode config, replica:%d ep:%s:%u dnode:%d", pCfg->vgId, i, pNode->nodeFqdn, pNode->nodePort,
|
||||
|
|
Loading…
Reference in New Issue