fix(tsdb): fix error in sma extracting and do NOT clear fileDelData when extracting data from stt files.

This commit is contained in:
Haojun Liao 2023-07-05 19:11:14 +08:00
parent 8f8dfc593b
commit a0a116b674
1 changed files with 18 additions and 49 deletions

View File

@ -104,7 +104,6 @@ typedef struct SIOCostSummary {
} SIOCostSummary; } SIOCostSummary;
typedef struct SBlockLoadSuppInfo { typedef struct SBlockLoadSuppInfo {
SArray* pColAgg; // todo remove it
TColumnDataAggArray colAggArray; TColumnDataAggArray colAggArray;
SColumnDataAgg tsColAgg; SColumnDataAgg tsColAgg;
int16_t* colId; int16_t* colId;
@ -816,12 +815,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pSup->pColAgg == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols); setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
@ -865,6 +858,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
} }
const TBrinBlkArray* pBlkArray = NULL; const TBrinBlkArray* pBlkArray = NULL;
int32_t code = tsdbDataFileReadBrinBlk(pFileReader, &pBlkArray); int32_t code = tsdbDataFileReadBrinBlk(pFileReader, &pBlkArray);
#if 0 #if 0
@ -890,14 +884,12 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
SBrinBlk* pBrinBlk = NULL; SBrinBlk* pBrinBlk = NULL;
STableUidList* pList = &pReader->status.uidList; STableUidList* pList = &pReader->status.uidList;
bool newBlk = false;
int32_t i = 0; int32_t i = 0;
while (i < TARRAY2_SIZE(pBlkArray)) { while (i < TARRAY2_SIZE(pBlkArray)) {
pBrinBlk = &pBlkArray->data[i]; pBrinBlk = &pBlkArray->data[i];
if (pBrinBlk->maxTbid.suid < pReader->suid) { if (pBrinBlk->maxTbid.suid < pReader->suid) {
i += 1; i += 1;
newBlk = true;
continue; continue;
} }
@ -906,14 +898,12 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
} }
ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid); ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid);
if (pBrinBlk->maxTbid.suid == pReader->suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
if (pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
i += 1; i += 1;
newBlk = true;
continue; continue;
} }
if (pBrinBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) { if (pBrinBlk->minTbid.suid == pReader->suid && pBrinBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) {
break; break;
} }
@ -2860,12 +2850,7 @@ static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t s
} }
static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfTables) { static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfTables) {
if (pReader->status.pCurrentFileset == NULL) { if (pReader->status.pCurrentFileset == NULL || pReader->status.pCurrentFileset->farr[3] == NULL) {
return TSDB_CODE_SUCCESS;
}
STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3];
if (pTombFileObj == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3418,7 +3403,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo); // doCleanupTableScanInfo(pScanInfo);
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3428,7 +3413,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
} }
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo); // doCleanupTableScanInfo(pScanInfo);
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInLastFile) {
@ -4864,8 +4849,6 @@ void tsdbReaderClose2(STsdbReader* pReader) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
TARRAY2_DESTROY(&pSupInfo->colAggArray, NULL); TARRAY2_DESTROY(&pSupInfo->colAggArray, NULL);
taosArrayDestroy(pSupInfo->pColAgg);
for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) { for (int32_t i = 0; i < pSupInfo->numOfCols; ++i) {
if (pSupInfo->buildBuf[i] != NULL) { if (pSupInfo->buildBuf[i] != NULL) {
taosMemoryFreeClear(pSupInfo->buildBuf[i]); taosMemoryFreeClear(pSupInfo->buildBuf[i]);
@ -5280,16 +5263,15 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
return code; return code;
} }
static bool doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) { static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
bool hasNullSMA = false;
// do fill all null column value SMA info // do fill all null column value SMA info
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); int32_t size = (int32_t)TARRAY2_SIZE(&pSup->colAggArray);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg); TARRAY2_INSERT_PTR(&pSup->colAggArray, 0, pTsAgg);
size++; size++;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = &pSup->colAggArray.data[i];
if (pAgg->colId == pSup->colId[j]) { if (pAgg->colId == pSup->colId[j]) {
i += 1; i += 1;
j += 1; j += 1;
@ -5298,10 +5280,9 @@ static bool doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg); TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
i += 1; i += 1;
size++; size++;
hasNullSMA = true;
} }
j += 1; j += 1;
} }
@ -5310,14 +5291,11 @@ static bool doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
while (j < numOfCols) { while (j < numOfCols) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg); TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
i += 1; i += 1;
hasNullSMA = true;
} }
j++; j++;
} }
return hasNullSMA;
} }
int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool *hasNullSMA) { int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool *hasNullSMA) {
@ -5339,7 +5317,8 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
if (pReader->resBlockInfo.pResBlock->info.id.uid != pFBlock->uid) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pResBlock->info.id.uid != pFBlock->uid) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -5365,29 +5344,19 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
pTsAgg->numOfNull = 0; pTsAgg->numOfNull = 0;
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID; pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pTsAgg->min = pReader->resBlockInfo.pResBlock->info.window.skey; pTsAgg->min = pResBlock->info.window.skey;
pTsAgg->max = pReader->resBlockInfo.pResBlock->info.window.ekey; pTsAgg->max = pResBlock->info.window.ekey;
// update the number of NULL data rows // update the number of NULL data rows
size_t numOfCols = pSup->numOfCols; size_t numOfCols = pSup->numOfCols;
// ensure capacity
if (pDataBlock->pDataBlock) {
size_t colsNum = taosArrayGetSize(pDataBlock->pDataBlock);
taosArrayEnsureCap(pSup->pColAgg, colsNum);
}
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pResBlock->pBlockAgg == NULL) { if (pResBlock->pBlockAgg == NULL) {
size_t num = taosArrayGetSize(pResBlock->pDataBlock); size_t num = taosArrayGetSize(pResBlock->pDataBlock);
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
} }
// do fill all null column value SMA info // do fill all null column value SMA info
if (doFillNullColSMA(pSup, pFBlock->record.numRow, numOfCols, pTsAgg)) { doFillNullColSMA(pSup, pFBlock->record.numRow, numOfCols, pTsAgg);
*hasNullSMA = true;
return TSDB_CODE_SUCCESS;
}
size_t size = pSup->colAggArray.size; size_t size = pSup->colAggArray.size;