[TD-4034]restore last not NULL column
This commit is contained in:
parent
3b2d5f74ed
commit
08848116e5
|
@ -645,6 +645,8 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
|
||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
|
||||
|
||||
if (tsdbSetReadTable(&readh, pTable) < 0) {
|
||||
tsdbDestroyReadH(&readh);
|
||||
return -1;
|
||||
|
@ -686,6 +688,49 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
pCol->offset);
|
||||
}
|
||||
}
|
||||
|
||||
// restore NULL columns
|
||||
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
int numColumns = schemaNCols(pSchema);
|
||||
pTable->lastCols = (SDataCol*)malloc(numColumns * sizeof(SDataCol));
|
||||
if (pTable->lastCols == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pTable->lastColNum = numColumns;
|
||||
|
||||
SDataRow row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||
if (row == NULL) {
|
||||
tfree(pTable->lastCols);
|
||||
pTable->lastColNum = 0;
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdInitDataRow(row, pSchema);
|
||||
|
||||
SDataCol *pLatestCols = pTable->lastCols;
|
||||
for (i = 0; i < pTable->lastColNum; ++i) {
|
||||
STColumn *pTCol = schemaColAt(pSchema, i);
|
||||
|
||||
SDataCol *pDataCol = &(pLatestCols[pTCol->colId]);
|
||||
pDataCol->pData = malloc(pTCol->bytes);
|
||||
pDataCol->bytes = pTCol->bytes;
|
||||
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pTCol->offset);
|
||||
if (isNullN(value, pTCol->type)) {
|
||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d NULL", REPO_ID(pRepo), pTable->name->data, pTCol->colId);
|
||||
continue;
|
||||
}
|
||||
|
||||
memcpy(pDataCol->pData, value, pDataCol->bytes);
|
||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d for %d,%s", REPO_ID(pRepo), pTable->name->data, pTCol->colId, pDataCol->bytes, (char*)pDataCol->pData);
|
||||
pDataCol->ts = dataRowTKey(row);
|
||||
}
|
||||
|
||||
taosTZfree(row);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -965,11 +965,12 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
|||
}
|
||||
|
||||
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
//tsdbDebug("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row));
|
||||
//tsdbInfo("vgId:%d updateTableLatestColumn, row version:%d", REPO_ID(pRepo), dataRowVersion(row));
|
||||
|
||||
if (pTable->numOfSchemas <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1];
|
||||
int i = pTable->numOfSchemas - 1;
|
||||
while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) {
|
||||
|
@ -983,21 +984,18 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
|
|||
SDataCol *pLatestCols = pTable->lastCols;
|
||||
|
||||
for (int j = 0; j < schemaNCols(pSchema); j++) {
|
||||
if (j >= pTable->lastColNum) {
|
||||
pTable->lastCols = realloc(pTable->lastCols, pTable->lastColNum + 10);
|
||||
STColumn *pTCol = schemaColAt(pSchema, j);
|
||||
|
||||
if (pTCol->colId >= pTable->lastColNum) {
|
||||
pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5);
|
||||
for (i = 0; i < 10; ++i) {
|
||||
pTable->lastCols[i + pTable->lastColNum].bytes = 0;
|
||||
pTable->lastCols[i + pTable->lastColNum].pData = NULL;
|
||||
}
|
||||
pTable->lastColNum += 10;
|
||||
pTable->lastColNum += pTCol->colId + 5;
|
||||
}
|
||||
|
||||
STColumn *pTCol = schemaColAt(pSchema, j);
|
||||
if (pTCol == NULL) {
|
||||
// since schema maybe changed, check if STColumn NULL then ignore
|
||||
continue;
|
||||
}
|
||||
SDataCol *pDataCol = &(pLatestCols[j]);
|
||||
|
||||
SDataCol *pDataCol = &(pLatestCols[pTCol->colId]);
|
||||
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
|
||||
if (isNullN(value, pTCol->type)) {
|
||||
continue;
|
||||
|
@ -1010,9 +1008,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
|
|||
pDataCol->bytes = pSchema->columns[j].bytes;
|
||||
}
|
||||
|
||||
//tsdbDebug("vgId:%d cache column %d for %d,%p", REPO_ID(pRepo), j, pDataCol->bytes, pDataCol->pData);
|
||||
|
||||
memcpy(pDataCol->pData, value, pDataCol->bytes);
|
||||
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
|
||||
pDataCol->ts = dataRowTKey(row);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue