fix(query):check null ptr.
This commit is contained in:
parent
b8bc052aa4
commit
f2b83dfb4a
|
@ -118,6 +118,11 @@ typedef struct SFileBlockDumpInfo {
|
|||
int64_t lastKey;
|
||||
} SFileBlockDumpInfo;
|
||||
|
||||
typedef struct SVersionRange {
|
||||
uint64_t minVer;
|
||||
uint64_t maxVer;
|
||||
} SVersionRange;
|
||||
|
||||
typedef struct SComposedDataBlock {
|
||||
bool composed;
|
||||
int32_t rows;
|
||||
|
@ -154,8 +159,7 @@ struct STsdbReader {
|
|||
STSchema* pSchema;
|
||||
|
||||
SDataFReader* pFileReader;
|
||||
int64_t startVersion;
|
||||
int64_t endVersion;
|
||||
SVersionRange verRange;
|
||||
#if 0
|
||||
SFileBlockInfo* pDataBlockInfo;
|
||||
SDataCols* pDataCols; // in order to hold current file data block
|
||||
|
@ -413,8 +417,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
|||
pReader->order = pCond->order;
|
||||
pReader->capacity = 4096;
|
||||
pReader->idStr = strdup(idstr);
|
||||
pReader->startVersion= pCond->startVersion;
|
||||
pReader->endVersion = 100000;//pCond->endVersion; // todo for test purpose
|
||||
pReader->verRange.minVer= pCond->startVersion;
|
||||
pReader->verRange.maxVer = 100000;//pCond->endVersion; // todo for test purpose
|
||||
pReader->type = pCond->type;
|
||||
pReader->window = *pCond->twindows;
|
||||
|
||||
|
@ -954,6 +958,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlockData* pBlockData) {
|
||||
pDumpInfo->rowIndex = pBlockData->nRow;
|
||||
pDumpInfo->totalRows = pBlockData->nRow;
|
||||
pDumpInfo->lastKey = pBlockData->aTSKEY[pBlockData->nRow - 1] + 1; // todo step value
|
||||
}
|
||||
|
||||
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
|
@ -966,6 +976,19 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SColVal cv = {0};
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) {
|
||||
SColData* pData = (SColData*)taosArrayGet(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
|
||||
SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i);
|
||||
for(int32_t j = 0; j < pBlockData->nRow; ++j) {
|
||||
tColDataGetValue(pData, j, &cv);
|
||||
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
|
||||
}
|
||||
}
|
||||
|
||||
pReader->pResBlock->info.rows = pBlockData->nRow;
|
||||
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
|
||||
|
||||
/*
|
||||
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
|
||||
(int)(QH_GET_NUM_OF_COLS(pReader)), true);
|
||||
|
@ -2081,7 +2104,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool blockIteratorNext(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
|
||||
if (pBlockIter->index >= pBlockIter->numOfBlocks - 1) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2252,6 +2275,15 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
|
|||
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
|
||||
}
|
||||
|
||||
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
|
||||
return (key.ts >= pBlock->minKey.ts || key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
|
||||
}
|
||||
|
||||
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
|
||||
return (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) ||
|
||||
keyOverlapFileBlock(key, pBlock, &pReader->verRange));
|
||||
}
|
||||
|
||||
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
|
||||
if (pBlockScanInfo->iter != NULL) {
|
||||
pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter);
|
||||
|
@ -2446,7 +2478,7 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion};
|
||||
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->verRange.minVer};
|
||||
|
||||
STbData* d = NULL;
|
||||
if (pReader->pTsdb->mem != NULL) {
|
||||
|
@ -2498,6 +2530,39 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
|
|||
return key;
|
||||
}
|
||||
|
||||
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
while (1) {
|
||||
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader);
|
||||
if (!hasNext) { // no data files on disk
|
||||
break;
|
||||
}
|
||||
|
||||
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
|
||||
int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pIndexList) > 0) {
|
||||
uint32_t numOfValidTable = 0;
|
||||
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (numOfValidTable > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// no blocks in current file, try next files
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SFileSetIter* pFIter = &pStatus->fileIter;
|
||||
|
@ -2507,35 +2572,13 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
|||
if (pFIter->index < pFIter->numOfFiles) {
|
||||
if (pReader->status.blockIter.index == -1) {
|
||||
int32_t numOfBlocks = 0;
|
||||
|
||||
while (1) {
|
||||
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader);
|
||||
if (!hasNext) { // no data files on disk
|
||||
break;
|
||||
}
|
||||
|
||||
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
|
||||
int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pIndexList) > 0) {
|
||||
uint32_t numOfValidTable = 0;
|
||||
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, &numOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (numOfValidTable > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// no blocks in current file, try next files
|
||||
int32_t code = moveToNextFile(pReader, &numOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
|
||||
// initialize the block iterator for a new fileset
|
||||
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2546,14 +2589,15 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
|||
|
||||
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
|
||||
if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) /*|| points overlaps with data block*/) {
|
||||
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
|
||||
SBlockData data = {0};
|
||||
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
|
||||
|
||||
// build composed data block
|
||||
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
|
||||
// data in memory that are earlier than current file block
|
||||
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion};
|
||||
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
|
||||
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
|
||||
// build data block from in-memory buffer data completed.
|
||||
} else { // whole block is required, return it directly
|
||||
|
@ -2567,8 +2611,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
|||
pInfo->window.skey = pBlock->minKey.ts;
|
||||
pInfo->window.ekey = pBlock->maxKey.ts;
|
||||
setComposedBlockFlag(pReader, false);
|
||||
*exists = true;
|
||||
}
|
||||
*exists = true;
|
||||
} else {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
|
@ -2577,14 +2621,53 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
|
|||
|
||||
// current block are exhausted, try the next file block
|
||||
if (pDumpInfo->rowIndex >= pBlock->nRow) {
|
||||
bool hasNext = blockIteratorNext(pReader, &pReader->status.blockIter);
|
||||
if (!hasNext) {
|
||||
// current file is exhausted, let's try the next file
|
||||
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
|
||||
if (!hasNext) { // current file is exhausted, let's try the next file
|
||||
int32_t numOfBlocks = 0;
|
||||
int32_t code = moveToNextFile(pReader, &numOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// initialize the block iterator for a new fileset
|
||||
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||
pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||
|
||||
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
|
||||
SBlockData data = {0};
|
||||
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
|
||||
// build composed data block
|
||||
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
|
||||
// data in memory that are earlier than current file block
|
||||
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
|
||||
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
|
||||
// build data block from in-memory buffer data completed.
|
||||
} else { // whole block is required, return it directly
|
||||
// todo
|
||||
// 1. the version of all rows should be less than the endVersion
|
||||
// 2. current block should not overlap with next neighbor block
|
||||
// 3. current timestamp should not be overlap with each other
|
||||
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
||||
pInfo->rows = pBlock->nRow;
|
||||
pInfo->uid = pScanInfo->uid;
|
||||
pInfo->window.skey = pBlock->minKey.ts;
|
||||
pInfo->window.ekey = pBlock->maxKey.ts;
|
||||
setComposedBlockFlag(pReader, false);
|
||||
}
|
||||
*exists = true;
|
||||
|
||||
} else { // try next data block in current file
|
||||
// 1. check if ts in buffer is overlap with current file data block
|
||||
TSDBKEY key1 = getCurrentKeyInBuf(pBlockIter, pReader);
|
||||
blockIteratorNext(pBlockIter);
|
||||
|
||||
|
||||
}
|
||||
} else {
|
||||
|
@ -2646,7 +2729,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (key.version <= pReader->endVersion) {
|
||||
if (key.version <= pReader->verRange.maxVer) {
|
||||
return pRow;
|
||||
}
|
||||
|
||||
|
@ -2664,7 +2747,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (key.version <= pReader->endVersion) {
|
||||
if (key.version <= pReader->verRange.maxVer) {
|
||||
return pRow;
|
||||
}
|
||||
}
|
||||
|
@ -2699,7 +2782,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
|
|||
int32_t rowIndex = pDumpInfo->rowIndex + 1;
|
||||
|
||||
while (pBlockData->aTSKEY[rowIndex] == key) {
|
||||
if (pBlockData->aVersion[rowIndex] > pReader->endVersion) {
|
||||
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -3217,7 +3300,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
|||
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
|
||||
initMemIterator(pBlockScanInfo, pReader);
|
||||
|
||||
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion};
|
||||
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
|
||||
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
|
||||
if (pReader->pResBlock->info.rows > 0) {
|
||||
return true;
|
||||
|
@ -3354,16 +3437,16 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
|
|||
}
|
||||
|
||||
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||
if (pReader->status.composedDataBlock) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
if (pStatus->composedDataBlock) {
|
||||
return pReader->pResBlock->pDataBlock;
|
||||
} else {
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
|
||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
int32_t code = tBlockDataInit(&pReader->status.fileBlockData);
|
||||
doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &pReader->status.fileBlockData);
|
||||
// TSDBROW row = tsdbRowFromBlockData(&pReader->status.fileBlockData, 0);
|
||||
// doAppendOneRow(pReader->pResBlock, pReader, row.);
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
|
||||
doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
|
||||
|
||||
return pReader->pResBlock->pDataBlock;
|
||||
}
|
||||
|
|
|
@ -187,6 +187,10 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab
|
|||
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
if (pTaskInfo->schemaVer.sw == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*sversion = pTaskInfo->schemaVer.sw->version;
|
||||
*tversion = pTaskInfo->schemaVer.tversion;
|
||||
if (pTaskInfo->schemaVer.dbname) {
|
||||
|
|
Loading…
Reference in New Issue