refactor: do internal refactor.

This commit is contained in:
Haojun Liao 2022-06-29 20:25:03 +08:00
parent 64460a95b0
commit c5951374d4
1 changed files with 138 additions and 222 deletions

View File

@ -22,12 +22,6 @@
.rows = (_block)->numOfRows, \ .rows = (_block)->numOfRows, \
.uid = (_checkInfo)->tableId}) .uid = (_checkInfo)->tableId})
enum {
TSDB_CACHED_TYPE_NONE = 0,
TSDB_CACHED_TYPE_LASTROW = 1,
TSDB_CACHED_TYPE_LAST = 2,
};
typedef struct SQueryFilePos { typedef struct SQueryFilePos {
int32_t fid; int32_t fid;
int32_t slot; int32_t slot;
@ -46,12 +40,6 @@ typedef struct SDataBlockLoadInfo {
SArray* pLoadedCols; SArray* pLoadedCols;
} SDataBlockLoadInfo; } SDataBlockLoadInfo;
enum {
CHECKINFO_CHOSEN_MEM = 0,
CHECKINFO_CHOSEN_IMEM = 1,
CHECKINFO_CHOSEN_BOTH = 2 // for update=2(merge case)
};
typedef struct STableBlockScanInfo { typedef struct STableBlockScanInfo {
uint64_t uid; uint64_t uid;
TSKEY lastKey; TSKEY lastKey;
@ -74,10 +62,10 @@ typedef struct SBlockOrderWrapper {
} SBlockOrderWrapper; } SBlockOrderWrapper;
typedef struct SBlockOrderSupporter { typedef struct SBlockOrderSupporter {
int32_t numOfTables;
SBlockOrderWrapper** pDataBlockInfo; SBlockOrderWrapper** pDataBlockInfo;
int32_t* indexPerTable; int32_t* indexPerTable;
int32_t* numOfBlocksPerTable; int32_t* numOfBlocksPerTable;
int32_t numOfTables;
} SBlockOrderSupporter; } SBlockOrderSupporter;
typedef struct SIOCostSummary { typedef struct SIOCostSummary {
@ -123,11 +111,6 @@ typedef struct SVersionRange {
uint64_t maxVer; uint64_t maxVer;
} SVersionRange; } SVersionRange;
typedef struct SComposedDataBlock {
bool composed;
int32_t rows;
} SComposedDataBlock;
typedef struct SReaderStatus { typedef struct SReaderStatus {
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
@ -289,13 +272,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
// return pNew; // return pNew;
// } // }
static bool isEmptyQueryTimeWindow(STsdbReader* pTsdbReader) { static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) {
ASSERT(pTsdbReader != NULL); ASSERT(pWindow != NULL);
bool asc = ASCENDING_TRAVERSE(order);
STimeWindow* w = &pTsdbReader->window; return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey));
bool asc = ASCENDING_TRAVERSE(pTsdbReader->order);
return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
} }
// // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
@ -334,34 +314,29 @@ static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond,
// } // }
} }
static void checkResultSize(const SQueryTableDataCond* pCond, STsdbReader* pReader) { static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
int32_t rowLen = 0; int32_t rowLen = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes; rowLen += pCond->colList[i].bytes;
} }
// make sure the output SSDataBlock size be less than 2MB. // make sure the output SSDataBlock size be less than 2MB.
int32_t TWOMB = 2 * 1024 * 1024; const int32_t TWOMB = 2 * 1024 * 1024;
if (pReader->capacity * rowLen > TWOMB) { if ((*capacity) * rowLen > TWOMB) {
pReader->capacity = TWOMB / rowLen; (*capacity) = TWOMB / rowLen;
} }
} }
// init file iterator // init file iterator
static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState) { static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState, const char* idstr) {
pIter->index = -1; pIter->index = -1;
pIter->numOfFiles = taosArrayGetSize(pFState->aDFileSet); pIter->numOfFiles = taosArrayGetSize(pFState->aDFileSet);
pIter->pFileList = taosArrayDup(pFState->aDFileSet); pIter->pFileList = taosArrayDup(pFState->aDFileSet);
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->numOfBlocks = -1;
pIter->index = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
}
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
pIter->index += 1; pIter->index += 1;
if (pIter->index >= pIter->numOfFiles) { if (pIter->index >= pIter->numOfFiles) {
@ -394,6 +369,12 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
return false; return false;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->index = -1;
pIter->numOfBlocks = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
}
static void initReaderStatus(SReaderStatus* pStatus) { static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->cur.fid = INT32_MIN; pStatus->cur.fid = INT32_MIN;
pStatus->cur.win = TSWINDOW_INITIALIZER; pStatus->cur.win = TSWINDOW_INITIALIZER;
@ -427,7 +408,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
setQueryTimewindow(pReader, pCond, 0); setQueryTimewindow(pReader, pCond, 0);
if (pCond->numOfCols > 0) { if (pCond->numOfCols > 0) {
checkResultSize(pCond, pReader); limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
@ -450,7 +431,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
} }
STsdbFSState* pFState = pReader->pTsdb->fs->cState; STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState); initFileIterator(&pReader->status.fileIter, pFState, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter); resetDataBlockIterator(&pReader->status.blockIter);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
@ -520,93 +501,6 @@ _end:
// return res; // return res;
// } // }
// static bool initTableMemIterator(STsdbReader* pHandle, STableBlockScanInfo* pCheckInfo) {
// if (pCheckInfo->initBuf) {
// return true;
// }
// pCheckInfo->initBuf = true;
// int32_t order = pHandle->order;
// STbData* pMem = NULL;
// STbData* pIMem = NULL;
// int8_t backward = (pHandle->order == TSDB_ORDER_DESC) ? 1 : 0;
// TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
// if (pHandle->pTsdb->mem != NULL) {
// tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem);
// if (pMem != NULL) {
// tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iter);
// }
// }
// if (pHandle->pTsdb->imem != NULL) {
// tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem);
// if (pIMem != NULL) {
// tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iiter);
// }
// }
// // both iterators are NULL, no data in buffer right now
// if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
// return false;
// }
// bool memEmpty =
// (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterGet(pCheckInfo->iter, NULL));
// bool imemEmpty =
// (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterGet(pCheckInfo->iiter, NULL));
// if (memEmpty && imemEmpty) { // buffer is empty
// return false;
// }
// if (!memEmpty) {
// TSDBROW row;
// tsdbTbDataIterGet(pCheckInfo->iter, &row);
// TSKEY key = row.pTSRow->ts; // first timestamp in buffer
// tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
// "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
// pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey,
// pMem->sl.size, pHandle->idStr);
// if (ASCENDING_TRAVERSE(order)) {
// assert(pCheckInfo->lastKey <= key);
// } else {
// assert(pCheckInfo->lastKey >= key);
// }
// } else {
// tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
// }
// if (!imemEmpty) {
// TSDBROW row;
// tsdbTbDataIterGet(pCheckInfo->iter, &row);
// TSKEY key = row.pTSRow->ts; // first timestamp in buffer
// tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
// "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
// pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey,
// pIMem->sl.size, pHandle->idStr);
// if (ASCENDING_TRAVERSE(order)) {
// assert(pCheckInfo->lastKey <= key);
// } else {
// assert(pCheckInfo->lastKey >= key);
// }
// } else {
// tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
// }
// return true;
// }
// static void destroyTableMemIterator(STableBlockScanInfo* pCheckInfo) {
// tsdbTbDataIterDestroy(pCheckInfo->iter);
// tsdbTbDataIterDestroy(pCheckInfo->iiter);
// }
// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) { // static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
// TSDBROW row = {0}; // TSDBROW row = {0};
// STSRow *rmem = NULL, *rimem = NULL; // STSRow *rmem = NULL, *rimem = NULL;
@ -958,10 +852,26 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlockData* pBlockData) { static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
pDumpInfo->rowIndex = pBlockData->nRow; int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
pDumpInfo->totalRows = pBlockData->nRow;
pDumpInfo->lastKey = pBlockData->aTSKEY[pBlockData->nRow - 1] + 1; // todo step value pDumpInfo->rowIndex = pBlock->nRow;
pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->lastKey = pBlock->maxKey.ts + step;
}
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (pColVal->isNull) {
colDataAppendNULL(pColInfoData, rowIndex);
} else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
}
} else {
colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull);
}
} }
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
@ -969,6 +879,9 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SSDataBlock* pResBlock = pReader->pResBlock;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
uint8_t *pb = NULL, *pb1 = NULL; uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1); int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1);
@ -976,45 +889,26 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
goto _error; goto _error;
} }
for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) { int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i);
SColVal cv = {0};
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
for (int32_t j = 0; j < pBlockData->nRow; ++j) { for (int32_t j = 0; j < pBlockData->nRow; ++j) {
colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false); colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false);
} }
} else { } else {
SColVal cv = {0}; SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pSupInfo->slotIds[i] - 1);
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1);
for (int32_t j = 0; j < pBlockData->nRow; ++j) { for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv); tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); doCopyColVal(pColData, j, i, &cv, pSupInfo);
} }
} }
} }
pReader->pResBlock->info.rows = pBlockData->nRow; pResBlock->info.rows = pBlockData->nRow;
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlock, pReader->order);
/*
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pReader)), true);
if (ret != TSDB_CODE_SUCCESS) {
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
goto _error;
}
SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pReader->pFileGroup;
pBlockLoadInfo->slot = pReader->cur.slot;
pBlockLoadInfo->uid = pCheckInfo->tableId;
SDataCols* pCols = pReader->rhelper.pDCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
pBlock->numOfRows = pCols->numOfRows;
*/
int64_t elapsedTime = (taosGetTimestampUs() - st); int64_t elapsedTime = (taosGetTimestampUs() - st);
pReader->cost.blockLoadTime += elapsedTime; pReader->cost.blockLoadTime += elapsedTime;
@ -1026,8 +920,10 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s", tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pReader->idStr); ", rows:%d, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pReader->idStr);
return code; return code;
} }
@ -2492,9 +2388,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (d != NULL) { if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code; return code;
} else {
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, d->minKey, d->maxKey,
pReader->idStr);
} }
} }
} else {
tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
} }
STbData* di = NULL; STbData* di = NULL;
@ -2503,9 +2408,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (di != NULL) { if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code; return code;
} else {
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, di->minKey, di->maxKey,
pReader->idStr);
} }
} }
} else {
tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
} }
pBlockScanInfo->iterInit = true; pBlockScanInfo->iterInit = true;
@ -2607,22 +2521,22 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SDataBlockInfo* pInfo = &pReader->pResBlock->info; SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow; pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid; pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts; pInfo->window = (STimeWindow) {.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false); setComposedBlockFlag(pReader, false);
setBlockDumpCompleted(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
} }
return code; return code;
} }
static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
while(1) { while(1) {
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
return false; return TSDB_CODE_SUCCESS;
} }
} }
@ -2630,20 +2544,26 @@ static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) {
initMemIterator(pBlockScanInfo, pReader); initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); int32_t code = buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->pResBlock->info.rows > 0) { if (pReader->pResBlock->info.rows > 0) {
return true; return TSDB_CODE_SUCCESS;
} }
// current table is exhausted, let's try the next table // current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
return false; return TSDB_CODE_SUCCESS;
} }
} }
} }
static int32_t buildBlockFromFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter; SFileSetIter* pFIter = &pStatus->fileIter;
@ -2652,7 +2572,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (pFIter->index < pFIter->numOfFiles) { if (pFIter->index < pFIter->numOfFiles) {
if (pReader->status.blockIter.index == -1) { if (pReader->status.blockIter.index == -1) {
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks); code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2664,7 +2584,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
@ -2676,7 +2598,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (!hasNext) { // current file is exhausted, let's try the next file if (!hasNext) { // current file is exhausted, let's try the next file
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks); code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2692,15 +2614,21 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code; return code;
} }
doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} else { // try next data block in current file } else { // try next data block in current file
blockIteratorNext(pBlockIter); blockIteratorNext(pBlockIter);
doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} else { } else {
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
return TSDB_CODE_SUCCESS; return code;
} }
// repeat the previous procedure. // repeat the previous procedure.
@ -2708,7 +2636,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
return TSDB_CODE_SUCCESS; return code;
} }
// // todo not unref yet, since it is not support multi-group interpolation query // // todo not unref yet, since it is not support multi-group interpolation query
@ -2901,18 +2829,7 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false); colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false);
} else { } else {
tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal); tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal);
doCopyColVal(pColInfoData, i, numOfRows, &colVal, pSupInfo);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (colVal.isNull) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pSupInfo->buildBuf[i], colVal.value.nData);
memcpy(varDataVal(pSupInfo->buildBuf[i]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pSupInfo->buildBuf[i], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull);
}
} }
} }
@ -3198,7 +3115,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
STsdbReader* pReader = *ppReader; STsdbReader* pReader = *ppReader;
if (isEmptyQueryTimeWindow(pReader)) { if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3259,7 +3176,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFreeClear(pReader->suppInfo.plist); taosMemoryFreeClear(pReader->suppInfo.plist);
taosMemoryFree(pReader->suppInfo.slotIds); taosMemoryFree(pReader->suppInfo.slotIds);
if (!isEmptyQueryTimeWindow(pReader)) { if (!isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
} else { } else {
ASSERT(pReader->status.pTableMap == NULL); ASSERT(pReader->status.pTableMap == NULL);
@ -3291,7 +3208,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
} }
bool tsdbNextDataBlock(STsdbReader* pReader) { bool tsdbNextDataBlock(STsdbReader* pReader) {
if (isEmptyQueryTimeWindow(pReader)) { if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
return false; return false;
} }
@ -3313,11 +3230,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pBlock->info.rows > 0) { if (pBlock->info.rows > 0) {
return true; return true;
} else { } else {
buildBlockFromBufferSeqentially(pReader); buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
} else { // no data in files, let's try the buffer } else { // no data in files, let's try the buffer
buildBlockFromBufferSeqentially(pReader); buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
@ -3370,13 +3287,12 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
int32_t code = 0; int32_t code = 0;
// *allHave = false; *allHave = false;
// SQueryFilePos* c = &pReader->cur; if (pReader->status.composedDataBlock) {
// if (c->mixBlock) { *pBlockStatis = NULL;
// *pBlockStatis = NULL; return TSDB_CODE_SUCCESS;
// return TSDB_CODE_SUCCESS; }
// }
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot]; // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
// assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0))); // assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));