[TD-4034]restore last not NULL column
This commit is contained in:
parent
f18355367c
commit
cad6e7ec51
|
@ -39,6 +39,7 @@ typedef struct STable {
|
||||||
|
|
||||||
SDataCol *lastCols;
|
SDataCol *lastCols;
|
||||||
int32_t lastColNum;
|
int32_t lastColNum;
|
||||||
|
int32_t restoreColumnNum;
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
} STable;
|
} STable;
|
||||||
|
|
||||||
|
|
|
@ -616,6 +616,118 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
|
||||||
|
SBlock* pBlock;
|
||||||
|
int numColumns;
|
||||||
|
int32_t blockIdx;
|
||||||
|
SDataStatis* pBlockStatis = NULL;
|
||||||
|
SDataRow row = NULL;
|
||||||
|
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||||
|
int err = 0;
|
||||||
|
|
||||||
|
numColumns = schemaNCols(pSchema);
|
||||||
|
if (numColumns <= pTable->restoreColumnNum) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
||||||
|
if (row == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
tdInitDataRow(row, pSchema);
|
||||||
|
|
||||||
|
// first load block index info
|
||||||
|
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlockStatis = calloc(numColumns, sizeof(SDataStatis));
|
||||||
|
if (pBlockStatis == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis));
|
||||||
|
for(int32_t i = 0; i < numColumns; ++i) {
|
||||||
|
pBlockStatis[i].colId = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
// load block from backward
|
||||||
|
SBlockIdx *pIdx = pReadh->pBlkIdx;
|
||||||
|
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
|
||||||
|
|
||||||
|
while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
|
||||||
|
bool loadStatisData = false;
|
||||||
|
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
|
||||||
|
blockIdx -= 1;
|
||||||
|
|
||||||
|
// load block data
|
||||||
|
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
|
||||||
|
err = -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// file block with sub-blocks has no statistics data
|
||||||
|
if (pBlock->numOfSubBlocks <= 1) {
|
||||||
|
tsdbLoadBlockStatis(pReadh, pBlock);
|
||||||
|
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns);
|
||||||
|
loadStatisData = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint32_t colId = 0; colId < numColumns && numColumns > pTable->restoreColumnNum; ++colId) {
|
||||||
|
// ignore loaded columns
|
||||||
|
if (pTable->lastCols[colId].bytes != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ignore block which has no not-null colId column
|
||||||
|
if (loadStatisData && pBlockStatis[colId].numOfNull == pBlock->numOfRows) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// OK,let's load row from backward to get not-null column
|
||||||
|
STColumn *pCol = schemaColAt(pSchema, colId);
|
||||||
|
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
|
||||||
|
SDataCol *pDataCol = pReadh->pDCols[0]->cols + colId;
|
||||||
|
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||||
|
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
|
||||||
|
void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
|
||||||
|
if (isNullN(value, pCol->type)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// save not-null column
|
||||||
|
SDataCol *pLastCol = &(pTable->lastCols[colId]);
|
||||||
|
pLastCol->pData = malloc(pCol->bytes);
|
||||||
|
pLastCol->bytes = pCol->bytes;
|
||||||
|
pLastCol->offset = pCol->offset;
|
||||||
|
pLastCol->colId = pCol->colId;
|
||||||
|
memcpy(pLastCol->pData, value, pCol->bytes);
|
||||||
|
|
||||||
|
// save row ts(in column 0)
|
||||||
|
pDataCol = pReadh->pDCols[0]->cols + 0;
|
||||||
|
pCol = schemaColAt(pSchema, 0);
|
||||||
|
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
|
||||||
|
pLastCol->ts = dataRowTKey(row);
|
||||||
|
|
||||||
|
pTable->restoreColumnNum += 1;
|
||||||
|
|
||||||
|
tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %ld", REPO_ID(pRepo), pTable->name->data, colId, pLastCol->ts);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out:
|
||||||
|
taosTZfree(row);
|
||||||
|
tfree(pBlockStatis);
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
SFSIter fsiter;
|
SFSIter fsiter;
|
||||||
SReadH readh;
|
SReadH readh;
|
||||||
|
@ -630,6 +742,14 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
|
||||||
|
|
||||||
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
|
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||||
|
STable *pTable = pMeta->tables[i];
|
||||||
|
if (pTable == NULL) continue;
|
||||||
|
pTable->restoreColumnNum = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
|
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
|
||||||
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
|
||||||
tsdbDestroyReadH(&readh);
|
tsdbDestroyReadH(&readh);
|
||||||
|
@ -688,71 +808,15 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
pCol->offset);
|
pCol->offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore NULL columns
|
|
||||||
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
|
||||||
if (tsdbLoadBlockInfo(&readh, NULL) < 0) {
|
|
||||||
tsdbDestroyReadH(&readh);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlock = readh.pBlkInfo->blocks + pIdx->numOfBlocks - 1;
|
|
||||||
|
|
||||||
if (tsdbLoadBlockData(&readh, pBlock, NULL) < 0) {
|
|
||||||
tsdbDestroyReadH(&readh);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
|
||||||
int numColumns = schemaNCols(pSchema);
|
|
||||||
pTable->lastCols = (SDataCol*)malloc(numColumns * sizeof(SDataCol));
|
|
||||||
if (pTable->lastCols == NULL) {
|
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
tsdbDestroyReadH(&readh);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pTable->lastColNum = numColumns;
|
|
||||||
|
|
||||||
SDataRow row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
|
|
||||||
if (row == NULL) {
|
|
||||||
tfree(pTable->lastCols);
|
|
||||||
pTable->lastColNum = 0;
|
|
||||||
tsdbDestroyReadH(&readh);
|
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdInitDataRow(row, pSchema);
|
|
||||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
|
||||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
|
||||||
SDataCol *pDataCol = readh.pDCols[0]->cols + icol;
|
|
||||||
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
|
||||||
pCol->offset);
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataCol *pLatestCols = pTable->lastCols;
|
|
||||||
for (i = 0; i < pTable->lastColNum; ++i) {
|
|
||||||
STColumn *pTCol = schemaColAt(pSchema, i);
|
|
||||||
|
|
||||||
SDataCol *pDataCol = &(pLatestCols[pTCol->colId]);
|
|
||||||
pDataCol->pData = malloc(pTCol->bytes);
|
|
||||||
pDataCol->bytes = pTCol->bytes;
|
|
||||||
|
|
||||||
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pTCol->offset);
|
|
||||||
if (isNullN(value, pTCol->type)) {
|
|
||||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d NULL", REPO_ID(pRepo), pTable->name->data, pTCol->colId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pDataCol->pData, value, pDataCol->bytes);
|
|
||||||
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s cache column %d for %d,%s", REPO_ID(pRepo), pTable->name->data, pTCol->colId, pDataCol->bytes, (char*)pDataCol->pData);
|
|
||||||
pDataCol->ts = dataRowTKey(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosTZfree(row);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// restore NULL columns
|
||||||
|
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
|
||||||
|
if (restoreLastColumns(pRepo, pTable, &readh) != 0) {
|
||||||
|
tsdbDestroyReadH(&readh);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -678,6 +678,7 @@ static STable *tsdbNewTable() {
|
||||||
pTable->lastCols[i].bytes = 0;
|
pTable->lastCols[i].bytes = 0;
|
||||||
pTable->lastCols[i].pData = NULL;
|
pTable->lastCols[i].pData = NULL;
|
||||||
}
|
}
|
||||||
|
pTable->restoreColumnNum = 0;
|
||||||
|
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue