Merge branch 'feature/TD-4034' into feature/TD-3950
This commit is contained in:
commit
a9edf4bdd5
|
@ -89,6 +89,7 @@ void tsdbOrgMeta(STsdbRepo* pRepo);
|
|||
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
|
||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
||||
STSchema* tsdbGetTableLatestSchema(STable *pTable);
|
||||
|
||||
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||
|
|
|
@ -26,6 +26,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
|
|||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||
static void tsdbStartStream(STsdbRepo *pRepo);
|
||||
static void tsdbStopStream(STsdbRepo *pRepo);
|
||||
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
|
||||
|
||||
// Function declaration
|
||||
int32_t tsdbCreateRepo(int repoid) {
|
||||
|
@ -616,24 +617,12 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
static STSchema* getTableLatestSchema(STable *pTable) {
|
||||
if (pTable->numOfSchemas > 0) {
|
||||
return pTable->schema[pTable->numOfSchemas - 1];
|
||||
}
|
||||
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
||||
//tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data);
|
||||
|
||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
if (pTable->pSuper && pTable->pSuper->numOfSchemas) {
|
||||
tsdbDebug("getTableLatestSchema of table %s from super table %s", pTable->name->data, pTable->pSuper->name->data);
|
||||
return pTable->pSuper->schema[pTable->pSuper->numOfSchemas - 1];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
||||
STSchema *pSchema = getTableLatestSchema(pTable);
|
||||
STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
|
||||
if (pSchema == NULL) {
|
||||
tsdbError("getTableLatestSchema of table %s fail", pTable->name->data);
|
||||
tsdbError("tsdbGetTableLatestSchema of table %s fail", pTable->name->data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -728,7 +717,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
|
|||
|
||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
||||
if (idx == -1) {
|
||||
tsdbError("restoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
|
||||
tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
|
||||
continue;
|
||||
}
|
||||
// save not-null column
|
||||
|
@ -746,7 +735,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
|
|||
|
||||
pTable->maxColumnNum += 1;
|
||||
|
||||
tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
||||
tsdbInfo("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -843,7 +832,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
|
||||
// restore NULL columns
|
||||
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||
if (restoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -965,23 +965,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
|||
}
|
||||
|
||||
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
//tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row));
|
||||
tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row));
|
||||
|
||||
if (pTable->numOfSchemas <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1];
|
||||
STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
|
||||
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int16_t i = pTable->numOfSchemas - 1;
|
||||
while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) {
|
||||
i -= 1;
|
||||
pSchema = pTable->schema[i];
|
||||
}
|
||||
if (pSchema == NULL || pSchema->version != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -645,6 +645,10 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
STSchema* tsdbGetTableLatestSchema(STable *pTable) {
|
||||
return tsdbGetTableSchemaByVersion(pTable, -1);
|
||||
}
|
||||
|
||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue