refactor: do some internal refactor.
This commit is contained in:
parent
57bf509dee
commit
41be3e4d04
|
@ -131,7 +131,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTabl
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
||||||
int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
|
|
|
@ -55,6 +55,7 @@ typedef struct SIOCostSummary {
|
||||||
} SIOCostSummary;
|
} SIOCostSummary;
|
||||||
|
|
||||||
typedef struct SBlockLoadSuppInfo {
|
typedef struct SBlockLoadSuppInfo {
|
||||||
|
SArray* pColAgg;
|
||||||
SColumnDataAgg tsColAgg;
|
SColumnDataAgg tsColAgg;
|
||||||
SColumnDataAgg** plist;
|
SColumnDataAgg** plist;
|
||||||
int16_t* colIds; // column ids for loading file block data
|
int16_t* colIds; // column ids for loading file block data
|
||||||
|
@ -364,8 +365,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
|
pSup->pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
|
||||||
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
||||||
if (pSup->plist == NULL) {
|
if (pSup->pColAgg == NULL || pSup->plist == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -2649,13 +2651,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
blockDataDestroy(pReader->pResBlock);
|
blockDataDestroy(pReader->pResBlock);
|
||||||
taosMemoryFreeClear(pReader->suppInfo.plist);
|
taosMemoryFreeClear(pReader->suppInfo.plist);
|
||||||
|
|
||||||
|
taosArrayDestroy(pReader->suppInfo.pColAgg);
|
||||||
taosMemoryFree(pReader->suppInfo.slotIds);
|
taosMemoryFree(pReader->suppInfo.slotIds);
|
||||||
|
|
||||||
if (!isEmptyQueryTimeWindow(&pReader->window)) {
|
|
||||||
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
|
|
||||||
} else {
|
|
||||||
ASSERT(pReader->status.pTableMap == NULL);
|
|
||||||
}
|
|
||||||
#if 0
|
#if 0
|
||||||
// if (pReader->status.pTableScanInfo != NULL) {
|
// if (pReader->status.pTableScanInfo != NULL) {
|
||||||
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
|
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
|
||||||
|
@ -2727,7 +2726,7 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
|
||||||
pDataBlockInfo->window = pReader->pResBlock->info.window;
|
pDataBlockInfo->window = pReader->pResBlock->info.window;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*allHave = false;
|
*allHave = false;
|
||||||
|
|
||||||
|
@ -2743,12 +2742,13 @@ int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBl
|
||||||
|
|
||||||
int64_t stime = taosGetTimestampUs();
|
int64_t stime = taosGetTimestampUs();
|
||||||
|
|
||||||
SArray* pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
|
|
||||||
if (tBlockHasSma(pBlock)) {
|
if (tBlockHasSma(pBlock)) {
|
||||||
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pColAgg, NULL);
|
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64", code:%s, %s", 0, pFBlock->uid,
|
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
|
||||||
tstrerror(code), pReader->idStr);
|
pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -2756,44 +2756,44 @@ int32_t tsdbRetrieveDatablockSMAInfo(STsdbReader* pReader, SColumnDataAgg*** pBl
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
*allHave = true;
|
*allHave = true;
|
||||||
|
|
||||||
// always load the first primary timestamp column data
|
// always load the first primary timestamp column data
|
||||||
SColumnDataAgg* pTsAgg = &pReader->suppInfo.tsColAgg;
|
SColumnDataAgg* pTsAgg = &pSup->tsColAgg;
|
||||||
|
|
||||||
pTsAgg->numOfNull = 0;
|
pTsAgg->numOfNull = 0;
|
||||||
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
pTsAgg->min = pReader->pResBlock->info.window.skey;
|
pTsAgg->min = pReader->pResBlock->info.window.skey;
|
||||||
pTsAgg->max = pReader->pResBlock->info.window.ekey;
|
pTsAgg->max = pReader->pResBlock->info.window.ekey;
|
||||||
pReader->suppInfo.plist[0] = pTsAgg;
|
pSup->plist[0] = pTsAgg;
|
||||||
|
|
||||||
// update the number of NULL data rows
|
// update the number of NULL data rows
|
||||||
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
|
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
|
||||||
|
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
while(j < numOfCols && i < taosArrayGetSize(pColAgg)) {
|
while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) {
|
||||||
SColumnDataAgg* pAgg = taosArrayGet(pColAgg, i);
|
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
|
||||||
if (pAgg->colId == pReader->suppInfo.colIds[j]) {
|
if (pAgg->colId == pSup->colIds[j]) {
|
||||||
if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
|
if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
|
||||||
pReader->suppInfo.plist[j] = pAgg;
|
pSup->plist[j] = pAgg;
|
||||||
i += 1;
|
i += 1;
|
||||||
j += 1;
|
j += 1;
|
||||||
} else {
|
} else {
|
||||||
*allHave = false;
|
*allHave = false;
|
||||||
}
|
}
|
||||||
} else if (pAgg->colId < pReader->suppInfo.colIds[j]) {
|
} else if (pAgg->colId < pSup->colIds[j]) {
|
||||||
i += 1;
|
i += 1;
|
||||||
} else if (pReader->suppInfo.colIds[j] < pAgg->colId) {
|
} else if (pSup->colIds[j] < pAgg->colId) {
|
||||||
j += 1;
|
j += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t elapsed = taosGetTimestampUs() - stime;
|
int64_t elapsed = taosGetTimestampUs() - stime;
|
||||||
pReader->cost.smaLoadTime += elapsed;
|
pReader->cost.smaLoadTime += elapsed;
|
||||||
|
|
||||||
*pBlockStatis = pReader->suppInfo.plist;
|
*pBlockStatis = pSup->plist;
|
||||||
|
|
||||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64", elapsed time:%"PRId64"us, %s", 0, pFBlock->uid,
|
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%" PRId64 "us, %s", 0, pFBlock->uid,
|
||||||
elapsed, pReader->idStr);
|
elapsed, pReader->idStr);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -2841,6 +2841,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
|
||||||
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
||||||
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
|
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
|
||||||
|
|
||||||
|
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
|
|
||||||
// todo set the correct numOfTables
|
// todo set the correct numOfTables
|
||||||
int32_t numOfTables = 1;
|
int32_t numOfTables = 1;
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
|
|
@ -1130,7 +1130,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
||||||
} else if ((*status) == BLK_DATA_SMA_LOAD) {
|
} else if ((*status) == BLK_DATA_SMA_LOAD) {
|
||||||
// this function never returns error?
|
// this function never returns error?
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
// tsdbRetrieveDatablockSMAInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
|
// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
|
||||||
|
|
||||||
if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block
|
if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block
|
||||||
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
|
// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
|
||||||
|
@ -1141,7 +1141,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
||||||
|
|
||||||
// load the data block statistics to perform further filter
|
// load the data block statistics to perform further filter
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
// tsdbRetrieveDatablockSMAInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
|
// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
|
||||||
|
|
||||||
if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
|
if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
|
||||||
{ // set previous window
|
{ // set previous window
|
||||||
|
|
|
@ -210,7 +210,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
|
|
||||||
bool allColumnsHaveAgg = true;
|
bool allColumnsHaveAgg = true;
|
||||||
SColumnDataAgg** pColAgg = NULL;
|
SColumnDataAgg** pColAgg = NULL;
|
||||||
int32_t code = tsdbRetrieveDatablockSMAInfo(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
|
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -2230,7 +2230,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
bool allColumnsHaveAgg = true;
|
bool allColumnsHaveAgg = true;
|
||||||
SColumnDataAgg** pColAgg = NULL;
|
SColumnDataAgg** pColAgg = NULL;
|
||||||
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
||||||
tsdbRetrieveDatablockSMAInfo(reader, &pColAgg, &allColumnsHaveAgg);
|
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
|
||||||
|
|
||||||
if (allColumnsHaveAgg == true) {
|
if (allColumnsHaveAgg == true) {
|
||||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
Loading…
Reference in New Issue