Merge branch 'feature/TD-4034' into feature/TD-3950
This commit is contained in:
commit
4605543868
|
@ -129,7 +129,7 @@ taosd -C
|
||||||
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改)
|
- blocks:每个VNODE(TSDB)中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为(cache * blocks)。单位为块,默认值:4。(可通过 alter database 修改)
|
||||||
- replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改)
|
- replica:副本个数,取值范围:1-3。单位为个,默认值:1。(可通过 alter database 修改)
|
||||||
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。
|
- precision:时间戳精度标识,ms表示毫秒,us表示微秒。默认值:ms。
|
||||||
- cacheLast:是否在内存中缓存子表 last_row,0:关闭;1:开启。默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
|
- cacheLast:是否在内存中缓存子表的最近数据,0:关闭;1:缓存子表最近一行数据;2:缓存子表每一列的最近的非NULL值,默认值:0。(可通过 alter database 修改)(从 2.0.11 版本开始支持此参数)
|
||||||
|
|
||||||
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL:
|
对于一个应用场景,可能有多种数据特征的数据并存,最佳的设计是将具有相同数据特征的表放在一个库里,这样一个应用有多个库,而每个库可以配置不同的存储参数,从而保证系统有最优的性能。TDengine允许应用在创建库时指定上述存储参数,如果指定,该参数就将覆盖对应的系统配置参数。举例,有下述SQL:
|
||||||
|
|
||||||
|
|
|
@ -234,6 +234,7 @@ typedef struct SDataCol {
|
||||||
int len; // column data length
|
int len; // column data length
|
||||||
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
|
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
|
||||||
void * pData; // Actual data pointer
|
void * pData; // Actual data pointer
|
||||||
|
TSKEY ts; // only used in last NULL column
|
||||||
} SDataCol;
|
} SDataCol;
|
||||||
|
|
||||||
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
||||||
|
|
|
@ -417,6 +417,18 @@ void setVardataNull(char* val, int32_t type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isVardataNull(char* val, int32_t type) {
|
||||||
|
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL;
|
||||||
|
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
|
||||||
|
} else {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
|
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
|
||||||
|
|
||||||
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
||||||
|
@ -492,6 +504,55 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isNullN(char *val, int32_t type) {
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
|
return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
return isVardataNull(val, type);
|
||||||
|
break;
|
||||||
|
default: {
|
||||||
|
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
|
static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
|
||||||
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
|
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
|
||||||
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
|
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
|
||||||
|
|
|
@ -298,7 +298,7 @@ do { \
|
||||||
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
|
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
|
||||||
|
|
||||||
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
|
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
|
||||||
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
|
#define TSDB_MAX_DB_CACHE_LAST_ROW 2
|
||||||
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
|
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
|
||||||
|
|
||||||
#define TSDB_MIN_FSYNC_PERIOD 0
|
#define TSDB_MIN_FSYNC_PERIOD 0
|
||||||
|
|
|
@ -69,9 +69,13 @@ typedef struct {
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int8_t compression;
|
int8_t compression;
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t cacheLastRow;
|
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column
|
||||||
} STsdbCfg;
|
} STsdbCfg;
|
||||||
|
|
||||||
|
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
|
||||||
|
#define CACHE_LAST_ROW(c) ((c)->cacheLastRow == 1)
|
||||||
|
#define CACHE_LAST_NULL_COLUMN(c) ((c)->cacheLastRow == 2)
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY USAGE STATISTICS
|
// --------- TSDB REPOSITORY USAGE STATISTICS
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t totalStorage; // total bytes occupie
|
int64_t totalStorage; // total bytes occupie
|
||||||
|
|
|
@ -178,6 +178,9 @@ void setNull(char *val, int32_t type, int32_t bytes);
|
||||||
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
|
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
|
||||||
void *getNullValue(int32_t type);
|
void *getNullValue(int32_t type);
|
||||||
|
|
||||||
|
bool isVardataNull(char* val, int32_t type);
|
||||||
|
bool isNullN(char *val, int32_t type);
|
||||||
|
|
||||||
void assignVal(char *val, const char *src, int32_t len, int32_t type);
|
void assignVal(char *val, const char *src, int32_t len, int32_t type);
|
||||||
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
|
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,10 @@ typedef struct STable {
|
||||||
char* sql;
|
char* sql;
|
||||||
void* cqhandle;
|
void* cqhandle;
|
||||||
SRWLatch latch; // TODO: implementa latch functions
|
SRWLatch latch; // TODO: implementa latch functions
|
||||||
|
|
||||||
|
SDataCol *lastCols;
|
||||||
|
int32_t lastColNum;
|
||||||
|
int32_t restoreColumnNum;
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
} STable;
|
} STable;
|
||||||
|
|
||||||
|
|
|
@ -511,8 +511,10 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||||
if (pCfg->update != 0) pCfg->update = 1;
|
if (pCfg->update != 0) pCfg->update = 1;
|
||||||
|
|
||||||
// update cacheLastRow
|
// update cacheLastRow
|
||||||
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
|
if (pCfg->cacheLastRow != 0) {
|
||||||
|
if (pCfg->cacheLastRow > 3)
|
||||||
|
pCfg->cacheLastRow = 1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,6 +616,129 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
||||||
|
SBlock* pBlock;
|
||||||
|
int numColumns;
|
||||||
|
int32_t blockIdx;
|
||||||
|
SDataStatis* pBlockStatis = NULL;
|
||||||
|
SDataRow row = NULL;
|
||||||
|
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||||
|
int err = 0;
|
||||||
|
|
||||||
|
numColumns = schemaNCols(pSchema);
|
||||||
|
if (numColumns <= pTable->restoreColumnNum) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||||
|
if (row == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
tdInitDataRow(row, pSchema);
|
||||||
|
|
||||||
|
// first load block index info
|
||||||
|
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlockStatis = calloc(numColumns, sizeof(SDataStatis));
|
||||||
|
if (pBlockStatis == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis));
|
||||||
|
for(int32_t i = 0; i < numColumns; ++i) {
|
||||||
|
STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
|
pBlockStatis[i].colId = pCol->colId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// load block from backward
|
||||||
|
SBlockIdx *pIdx = pReadh->pBlkIdx;
|
||||||
|
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
|
||||||
|
|
||||||
|
while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
|
||||||
|
bool loadStatisData = false;
|
||||||
|
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
|
||||||
|
blockIdx -= 1;
|
||||||
|
|
||||||
|
// load block data
|
||||||
|
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// file block with sub-blocks has no statistics data
|
||||||
|
if (pBlock->numOfSubBlocks <= 1) {
|
||||||
|
tsdbLoadBlockStatis(pReadh, pBlock);
|
||||||
|
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
|
||||||
|
loadStatisData = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint32_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
|
||||||
|
STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
|
|
||||||
|
if (i >= pTable->lastColNum) {
|
||||||
|
pTable->lastCols = realloc(pTable->lastCols, i + 5);
|
||||||
|
for (int m = 0; m < 5; ++m) {
|
||||||
|
pTable->lastCols[m + pTable->lastColNum].bytes = 0;
|
||||||
|
pTable->lastCols[m + pTable->lastColNum].pData = NULL;
|
||||||
|
}
|
||||||
|
pTable->lastColNum += i + 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore loaded columns
|
||||||
|
if (pTable->lastCols[i].bytes != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore block which has no not-null colId column
|
||||||
|
if (loadStatisData && pBlockStatis[i].numOfNull == pBlock->numOfRows) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK,let's load row from backward to get not-null column
|
||||||
|
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
|
||||||
|
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
|
||||||
|
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||||
|
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
|
||||||
|
void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
|
||||||
|
if (isNullN(value, pCol->type)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// save not-null column
|
||||||
|
SDataCol *pLastCol = &(pTable->lastCols[i]);
|
||||||
|
pLastCol->pData = malloc(pCol->bytes);
|
||||||
|
pLastCol->bytes = pCol->bytes;
|
||||||
|
pLastCol->offset = pCol->offset;
|
||||||
|
pLastCol->colId = pCol->colId;
|
||||||
|
memcpy(pLastCol->pData, value, pCol->bytes);
|
||||||
|
|
||||||
|
// save row ts(in column 0)
|
||||||
|
pDataCol = pReadh->pDCols[0]->cols + 0;
|
||||||
|
pCol = schemaColAt(pSchema, 0);
|
||||||
|
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||||
|
pLastCol->ts = dataRowTKey(row);
|
||||||
|
|
||||||
|
pTable->restoreColumnNum += 1;
|
||||||
|
|
||||||
|
tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %d", REPO_ID(pRepo), pTable->name->data, pCol->colId, (int32_t)pLastCol->ts);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out:
|
||||||
|
taosTZfree(row);
|
||||||
|
tfree(pBlockStatis);
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
SFSIter fsiter;
|
SFSIter fsiter;
|
||||||
SReadH readh;
|
SReadH readh;
|
||||||
|
@ -628,6 +753,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
||||||
|
|
||||||
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
|
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||||
|
STable *pTable = pMeta->tables[i];
|
||||||
|
if (pTable == NULL) continue;
|
||||||
|
pTable->restoreColumnNum = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
|
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
|
||||||
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
|
@ -643,6 +776,8 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
if (pTable == NULL) continue;
|
if (pTable == NULL) continue;
|
||||||
|
|
||||||
|
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
|
||||||
|
|
||||||
if (tsdbSetReadTable(&readh, pTable) < 0) {
|
if (tsdbSetReadTable(&readh, pTable) < 0) {
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -653,7 +788,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
if (pIdx && lastKey < pIdx->maxKey) {
|
if (pIdx && lastKey < pIdx->maxKey) {
|
||||||
pTable->lastKey = pIdx->maxKey;
|
pTable->lastKey = pIdx->maxKey;
|
||||||
|
|
||||||
if (pCfg->cacheLastRow) {
|
if (CACHE_LAST_ROW(pCfg)) {
|
||||||
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -686,6 +821,13 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// restore NULL columns
|
||||||
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
|
if (restoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -964,6 +964,56 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||||
|
//tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row));
|
||||||
|
|
||||||
|
if (pTable->numOfSchemas <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1];
|
||||||
|
int i = pTable->numOfSchemas - 1;
|
||||||
|
while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) {
|
||||||
|
i -= 1;
|
||||||
|
pSchema = pTable->schema[i];
|
||||||
|
}
|
||||||
|
if (pSchema == NULL || pSchema->version != dataRowVersion(row)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataCol *pLatestCols = pTable->lastCols;
|
||||||
|
|
||||||
|
for (int j = 0; j < schemaNCols(pSchema); j++) {
|
||||||
|
STColumn *pTCol = schemaColAt(pSchema, j);
|
||||||
|
|
||||||
|
if (pTCol->colId >= pTable->lastColNum) {
|
||||||
|
pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5);
|
||||||
|
for (i = 0; i < 10; ++i) {
|
||||||
|
pTable->lastCols[i + pTable->lastColNum].bytes = 0;
|
||||||
|
pTable->lastCols[i + pTable->lastColNum].pData = NULL;
|
||||||
|
}
|
||||||
|
pTable->lastColNum += pTCol->colId + 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataCol *pDataCol = &(pLatestCols[pTCol->colId]);
|
||||||
|
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
|
||||||
|
if (isNullN(value, pTCol->type)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (pDataCol->pData == NULL) {
|
||||||
|
pDataCol->pData = malloc(pSchema->columns[j].bytes);
|
||||||
|
pDataCol->bytes = pSchema->columns[j].bytes;
|
||||||
|
} else if (pDataCol->bytes < pSchema->columns[j].bytes) {
|
||||||
|
pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes);
|
||||||
|
pDataCol->bytes = pSchema->columns[j].bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pDataCol->pData, value, pDataCol->bytes);
|
||||||
|
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
|
||||||
|
pDataCol->ts = dataRowTKey(row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||||
STsdbCfg *pCfg = &pRepo->config;
|
STsdbCfg *pCfg = &pRepo->config;
|
||||||
|
|
||||||
|
@ -977,7 +1027,7 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
|
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
|
||||||
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
|
if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) {
|
||||||
SDataRow nrow = pTable->lastRow;
|
SDataRow nrow = pTable->lastRow;
|
||||||
if (taosTSizeof(nrow) < dataRowLen(row)) {
|
if (taosTSizeof(nrow) < dataRowLen(row)) {
|
||||||
SDataRow orow = nrow;
|
SDataRow orow = nrow;
|
||||||
|
@ -1002,7 +1052,10 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
|
||||||
} else {
|
} else {
|
||||||
pTable->lastKey = dataRowKey(row);
|
pTable->lastKey = dataRowKey(row);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
|
updateTableLatestColumn(pRepo, pTable, row);
|
||||||
|
}
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
#include "tsdbint.h"
|
#include "tsdbint.h"
|
||||||
|
|
||||||
|
#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
||||||
#define DEFAULT_TAG_INDEX_COLUMN 0
|
#define DEFAULT_TAG_INDEX_COLUMN 0
|
||||||
|
|
||||||
|
@ -671,6 +672,13 @@ static STable *tsdbNewTable() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||||
|
pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol));
|
||||||
|
pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE;
|
||||||
|
for (int i = 0; i < pTable->lastColNum; ++i) {
|
||||||
|
pTable->lastCols[i].bytes = 0;
|
||||||
|
pTable->lastCols[i].pData = NULL;
|
||||||
|
}
|
||||||
|
pTable->restoreColumnNum = 0;
|
||||||
|
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
@ -785,8 +793,17 @@ static void tsdbFreeTable(STable *pTable) {
|
||||||
kvRowFree(pTable->tagVal);
|
kvRowFree(pTable->tagVal);
|
||||||
|
|
||||||
tSkipListDestroy(pTable->pIndex);
|
tSkipListDestroy(pTable->pIndex);
|
||||||
taosTZfree(pTable->lastRow);
|
taosTZfree(pTable->lastRow);
|
||||||
tfree(pTable->sql);
|
tfree(pTable->sql);
|
||||||
|
|
||||||
|
for (int i = 0; i < pTable->lastColNum; ++i) {
|
||||||
|
if (pTable->lastCols[i].pData == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
free(pTable->lastCols[i].pData);
|
||||||
|
}
|
||||||
|
tfree(pTable->lastCols);
|
||||||
|
|
||||||
free(pTable);
|
free(pTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue