[TD-4034]fix super table bug
This commit is contained in:
parent
fcfd4d29d1
commit
4beb0b4d07
|
@ -89,6 +89,7 @@ void tsdbOrgMeta(STsdbRepo* pRepo);
|
||||||
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
|
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
|
||||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
||||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
||||||
|
STSchema* tsdbGetTableLatestSchema(STable *pTable);
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
|
||||||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||||
static void tsdbStartStream(STsdbRepo *pRepo);
|
static void tsdbStartStream(STsdbRepo *pRepo);
|
||||||
static void tsdbStopStream(STsdbRepo *pRepo);
|
static void tsdbStopStream(STsdbRepo *pRepo);
|
||||||
|
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
|
||||||
|
|
||||||
// Function declaration
|
// Function declaration
|
||||||
int32_t tsdbCreateRepo(int repoid) {
|
int32_t tsdbCreateRepo(int repoid) {
|
||||||
|
@ -616,24 +617,12 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static STSchema* getTableLatestSchema(STable *pTable) {
|
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
||||||
if (pTable->numOfSchemas > 0) {
|
//tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data);
|
||||||
return pTable->schema[pTable->numOfSchemas - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
|
||||||
if (pTable->pSuper && pTable->pSuper->numOfSchemas) {
|
|
||||||
tsdbDebug("getTableLatestSchema of table %s from super table %s", pTable->name->data, pTable->pSuper->name->data);
|
|
||||||
return pTable->pSuper->schema[pTable->pSuper->numOfSchemas - 1];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
|
||||||
STSchema *pSchema = getTableLatestSchema(pTable);
|
|
||||||
if (pSchema == NULL) {
|
if (pSchema == NULL) {
|
||||||
tsdbError("getTableLatestSchema of table %s fail", pTable->name->data);
|
tsdbError("tsdbGetTableLatestSchema of table %s fail", pTable->name->data);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -728,7 +717,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
|
||||||
|
|
||||||
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
|
||||||
if (idx == -1) {
|
if (idx == -1) {
|
||||||
tsdbError("restoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
|
tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// save not-null column
|
// save not-null column
|
||||||
|
@ -746,7 +735,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
|
||||||
|
|
||||||
pTable->maxColumnNum += 1;
|
pTable->maxColumnNum += 1;
|
||||||
|
|
||||||
tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
tsdbInfo("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -843,7 +832,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
// restore NULL columns
|
// restore NULL columns
|
||||||
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) {
|
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
if (restoreLastColumns(pRepo, pTable, &readh) != 0) {
|
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -965,23 +965,15 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||||
//tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row));
|
tsdbInfo("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row));
|
||||||
|
|
||||||
if (pTable->numOfSchemas <= 0) {
|
STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1];
|
|
||||||
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
|
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t i = pTable->numOfSchemas - 1;
|
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
||||||
while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) {
|
if (pSchema == NULL) {
|
||||||
i -= 1;
|
|
||||||
pSchema = pTable->schema[i];
|
|
||||||
}
|
|
||||||
if (pSchema == NULL || pSchema->version != dataRowVersion(row)) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -645,6 +645,10 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STSchema* tsdbGetTableLatestSchema(STable *pTable) {
|
||||||
|
return tsdbGetTableSchemaByVersion(pTable, -1);
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
|
||||||
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
|
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue