[TD-4034]refactor code
This commit is contained in:
parent
6213438f2e
commit
d66b6e2d85
|
@ -417,18 +417,6 @@ void setVardataNull(char* val, int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
bool isVardataNull(char* val, int32_t type) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
return *(uint8_t*) varDataVal(val) == TSDB_DATA_BINARY_NULL;
|
||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
return *(uint32_t*) varDataVal(val) == TSDB_DATA_NCHAR_NULL;
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void setNull(char *val, int32_t type, int32_t bytes) { setNullN(val, type, bytes, 1); }
|
||||
|
||||
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
||||
|
@ -504,55 +492,6 @@ void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems) {
|
|||
}
|
||||
}
|
||||
|
||||
bool isNullN(char *val, int32_t type) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
return *(uint8_t *)(val) == TSDB_DATA_BOOL_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
return *(uint8_t *)(val) == TSDB_DATA_TINYINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
return *(uint16_t *)(val) == TSDB_DATA_SMALLINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
return *(uint64_t *)(val) == TSDB_DATA_BIGINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
return *(uint8_t *)(val) == TSDB_DATA_UTINYINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
return *(uint16_t *)(val) == TSDB_DATA_USMALLINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
return *(uint32_t *)(val) == TSDB_DATA_UINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
return *(uint64_t *)(val) == TSDB_DATA_UBIGINT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
return *(uint32_t *)(val) == TSDB_DATA_FLOAT_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
return *(uint64_t *)(val) == TSDB_DATA_DOUBLE_NULL;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
return isVardataNull(val, type);
|
||||
break;
|
||||
default: {
|
||||
return *(uint32_t *)(val) == TSDB_DATA_INT_NULL;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static uint8_t nullBool = TSDB_DATA_BOOL_NULL;
|
||||
static uint8_t nullTinyInt = TSDB_DATA_TINYINT_NULL;
|
||||
static uint16_t nullSmallInt = TSDB_DATA_SMALLINT_NULL;
|
||||
|
|
|
@ -69,7 +69,7 @@ typedef struct {
|
|||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column
|
||||
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
|
||||
} STsdbCfg;
|
||||
|
||||
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
|
||||
|
|
|
@ -178,9 +178,6 @@ void setNull(char *val, int32_t type, int32_t bytes);
|
|||
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
|
||||
void *getNullValue(int32_t type);
|
||||
|
||||
bool isVardataNull(char* val, int32_t type);
|
||||
bool isNullN(char *val, int32_t type);
|
||||
|
||||
void assignVal(char *val, const char *src, int32_t len, int32_t type);
|
||||
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
|
||||
|
||||
|
|
|
@ -45,9 +45,6 @@ typedef struct STable {
|
|||
T_REF_DECLARE()
|
||||
} STable;
|
||||
|
||||
#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20
|
||||
#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5
|
||||
|
||||
typedef struct {
|
||||
pthread_rwlock_t rwLock;
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo);
|
|||
static void tsdbStartStream(STsdbRepo *pRepo);
|
||||
static void tsdbStopStream(STsdbRepo *pRepo);
|
||||
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
|
||||
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
|
||||
|
||||
// Function declaration
|
||||
int32_t tsdbCreateRepo(int repoid) {
|
||||
|
@ -270,9 +271,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
|
|||
pthread_mutex_unlock(&repo->save_mutex);
|
||||
|
||||
// schedule a commit msg then the new config will be applied immediatly
|
||||
if (tsdbLockRepo(repo) < 0) return -1;
|
||||
tsdbScheduleCommit(repo);
|
||||
if (tsdbUnlockRepo(repo) < 0) return -1;
|
||||
tsdbAsyncCommit(repo);
|
||||
|
||||
return 0;
|
||||
#if 0
|
||||
|
@ -645,6 +644,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
|||
|
||||
numColumns = schemaNCols(pSchema);
|
||||
if (numColumns <= pTable->restoreColumnNum) {
|
||||
pTable->hasRestoreLastColumn = true;
|
||||
return 0;
|
||||
}
|
||||
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
|
||||
|
@ -719,7 +719,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
|||
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
|
||||
if (isNullN(value, pCol->type)) {
|
||||
if (isNull(value, pCol->type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -753,16 +753,51 @@ out:
|
|||
taosTZfree(row);
|
||||
tfree(pBlockStatis);
|
||||
|
||||
if (err == 0 && numColumns <= pTable->restoreColumnNum) {
|
||||
pTable->hasRestoreLastColumn = true;
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
|
||||
ASSERT(pTable->lastRow == NULL);
|
||||
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
||||
|
||||
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Get the data in row
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (pTable->lastRow == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdInitDataRow(pTable->lastRow, pSchema);
|
||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
|
||||
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||
pCol->offset);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
SFSIter fsiter;
|
||||
SReadH readh;
|
||||
SDFileSet *pSet;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
SBlock * pBlock;
|
||||
|
||||
if (tsdbInitReadH(&readh, pRepo) < 0) {
|
||||
return -1;
|
||||
|
@ -805,41 +840,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
if (pIdx && lastKey < pIdx->maxKey) {
|
||||
pTable->lastKey = pIdx->maxKey;
|
||||
|
||||
if (CACHE_LAST_ROW(pCfg)) {
|
||||
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
||||
if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
||||
|
||||
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Get the data in row
|
||||
ASSERT(pTable->lastRow == NULL);
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (pTable->lastRow == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdInitDataRow(pTable->lastRow, pSchema);
|
||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
|
||||
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||
pCol->offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// restore NULL columns
|
||||
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) {
|
||||
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
|
@ -865,8 +873,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
|
|||
SReadH readh;
|
||||
SDFileSet *pSet;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
//STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
SBlock * pBlock;
|
||||
int tableNum = 0;
|
||||
int maxTableIdx = 0;
|
||||
int cacheLastRowTableNum = 0;
|
||||
|
@ -955,35 +961,10 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
|
|||
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
|
||||
pTable->lastKey = pIdx->maxKey;
|
||||
|
||||
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
||||
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
||||
|
||||
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Get the data in row
|
||||
ASSERT(pTable->lastRow == NULL);
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (pTable->lastRow == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdInitDataRow(pTable->lastRow, pSchema);
|
||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
|
||||
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||
pCol->offset);
|
||||
}
|
||||
cacheLastRowTableNum -= 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -274,7 +274,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||
tsem_wait(&(pRepo->readyToCommit));
|
||||
|
||||
ASSERT(pRepo->imem == NULL);
|
||||
//ASSERT(pRepo->imem == NULL);
|
||||
if (pRepo->mem == NULL) {
|
||||
tsem_post(&(pRepo->readyToCommit));
|
||||
return 0;
|
||||
|
@ -965,7 +965,7 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
|||
}
|
||||
|
||||
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row));
|
||||
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row));
|
||||
|
||||
STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
|
||||
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
|
||||
|
@ -988,7 +988,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
|
|||
}
|
||||
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
|
||||
if (isNullN(value, pTCol->type)) {
|
||||
if (isNull(value, pTCol->type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue