refactor: do some internal refactor.
This commit is contained in:
parent
c11ed48937
commit
adf085ab46
|
@ -159,7 +159,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTabl
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
|
bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid);
|
||||||
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||||
|
|
|
@ -3787,24 +3787,24 @@ bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
|
||||||
ASSERT(pDataBlockInfo != NULL && pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
pDataBlockInfo->rows = pReader->pResBlock->info.rows;
|
*rows = pReader->pResBlock->info.rows;
|
||||||
pDataBlockInfo->uid = pReader->pResBlock->info.uid;
|
*uid = pReader->pResBlock->info.uid;
|
||||||
pDataBlockInfo->window = pReader->pResBlock->info.window;
|
*pWindow = pReader->pResBlock->info.window;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
if (pReader->step == EXTERNAL_ROWS_MAIN) {
|
if (pReader->step == EXTERNAL_ROWS_MAIN) {
|
||||||
setBlockInfo(pReader, pDataBlockInfo);
|
setBlockInfo(pReader, rows, uid, pWindow);
|
||||||
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
|
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
|
||||||
setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
|
setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
|
||||||
} else {
|
} else {
|
||||||
setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
|
setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
setBlockInfo(pReader, pDataBlockInfo);
|
setBlockInfo(pReader, rows, uid, pWindow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -618,13 +618,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
SDataBlockInfo binfo = pBlock->info;
|
SDataBlockInfo* pBInfo = &pBlock->info;
|
||||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);
|
|
||||||
|
|
||||||
binfo.capacity = binfo.rows;
|
int32_t rows = 0;
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window);
|
||||||
pBlock->info = binfo;
|
blockDataEnsureCapacity(pBlock, rows);
|
||||||
ASSERT(binfo.uid != 0);
|
pBlock->info.rows = rows;
|
||||||
|
|
||||||
|
ASSERT(pBInfo->uid != 0);
|
||||||
|
|
||||||
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
|
|
||||||
|
@ -1120,20 +1121,19 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
||||||
|
|
||||||
bool hasBlock = tsdbNextDataBlock(pReader);
|
bool hasBlock = tsdbNextDataBlock(pReader);
|
||||||
if (hasBlock) {
|
if (hasBlock) {
|
||||||
SDataBlockInfo binfo = {0};
|
SDataBlockInfo* pBInfo = &pBlock->info;
|
||||||
tsdbRetrieveDataBlockInfo(pReader, &binfo);
|
|
||||||
|
int32_t rows = 0;
|
||||||
|
tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->uid, &pBInfo->window);
|
||||||
|
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
|
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
blockDataEnsureCapacity(pBlock, pBInfo->rows);
|
||||||
|
pBInfo->rows = rows;
|
||||||
pBlock->info.window = binfo.window;
|
|
||||||
pBlock->info.uid = binfo.uid;
|
|
||||||
pBlock->info.rows = binfo.rows;
|
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
|
|
||||||
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid);
|
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBInfo->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderClose(pReader);
|
tsdbReaderClose(pReader);
|
||||||
|
@ -2121,7 +2121,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &pBlock->info);
|
int32_t rows = 0;
|
||||||
|
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.uid, &pBlock->info.window);
|
||||||
|
pBlock->info.rows = rows;
|
||||||
|
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
|
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
|
||||||
pBlock->pDataBlock = pCols;
|
pBlock->pDataBlock = pCols;
|
||||||
|
@ -4561,14 +4563,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
SDataBlockInfo binfo = pBlock->info;
|
|
||||||
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
int32_t rows = 0;
|
||||||
pBlock->info.type = binfo.type;
|
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
|
||||||
pBlock->info.uid = binfo.uid;
|
blockDataEnsureCapacity(pBlock, rows);
|
||||||
pBlock->info.window = binfo.window;
|
pBlock->info.rows = rows;
|
||||||
pBlock->info.rows = binfo.rows;
|
|
||||||
|
|
||||||
if (tsdbIsAscendingOrder(pInfo->pReader)) {
|
if (tsdbIsAscendingOrder(pInfo->pReader)) {
|
||||||
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
||||||
|
@ -4627,14 +4626,11 @@ static SSDataBlock* getTableDataBlock2(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
SDataBlockInfo binfo = pBlock->info;
|
|
||||||
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
int32_t rows = 0;
|
||||||
pBlock->info.type = binfo.type;
|
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
|
||||||
pBlock->info.uid = binfo.uid;
|
blockDataEnsureCapacity(pBlock, rows);
|
||||||
pBlock->info.window = binfo.window;
|
pBlock->info.rows = rows;
|
||||||
pBlock->info.rows = binfo.rows;
|
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
|
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
@ -4684,14 +4680,11 @@ static SSDataBlock* getTableDataBlock(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
SDataBlockInfo binfo = pBlock->info;
|
|
||||||
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
int32_t rows = 0;
|
||||||
pBlock->info.type = binfo.type;
|
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
|
||||||
pBlock->info.uid = binfo.uid;
|
blockDataEnsureCapacity(pBlock, rows);
|
||||||
pBlock->info.window = binfo.window;
|
pBlock->info.rows = rows;
|
||||||
pBlock->info.rows = binfo.rows;
|
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
|
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
|
||||||
|
|
Loading…
Reference in New Issue