fix(query): initialize the blockdata before load last block.
This commit is contained in:
parent
4bfdfe89a5
commit
cbe91fd4c3
|
@ -633,6 +633,9 @@ typedef struct SSttBlockLoadInfo {
|
||||||
int32_t currentLoadBlockIndex;
|
int32_t currentLoadBlockIndex;
|
||||||
int32_t loadBlocks;
|
int32_t loadBlocks;
|
||||||
double elapsedTime;
|
double elapsedTime;
|
||||||
|
STSchema *pSchema;
|
||||||
|
int16_t *colIds;
|
||||||
|
int32_t numOfCols;
|
||||||
} SSttBlockLoadInfo;
|
} SSttBlockLoadInfo;
|
||||||
|
|
||||||
typedef struct SMergeTree {
|
typedef struct SMergeTree {
|
||||||
|
@ -652,13 +655,14 @@ typedef struct {
|
||||||
} SSkmInfo;
|
} SSkmInfo;
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema,
|
||||||
|
int16_t* pCols, int32_t numOfCols, const char* idStr);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
void tMergeTreeClose(SMergeTree *pMTree);
|
void tMergeTreeClose(SMergeTree *pMTree);
|
||||||
|
|
||||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo();
|
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols);
|
||||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
|
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
|
||||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||||
|
|
|
@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
|
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL);
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL);
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
|
|
|
@ -28,11 +28,10 @@ struct SLDataIter {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
|
|
||||||
SSttBlockLoadInfo* pBlockLoadInfo;
|
SSttBlockLoadInfo* pBlockLoadInfo;
|
||||||
};
|
};
|
||||||
|
|
||||||
SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
|
SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols) {
|
||||||
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
|
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
|
||||||
if (pLoadInfo == NULL) {
|
if (pLoadInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -57,6 +56,9 @@ SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
|
||||||
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pLoadInfo->pSchema = pSchema;
|
||||||
|
pLoadInfo->colIds = colList;
|
||||||
|
pLoadInfo->numOfCols = numOfCols;
|
||||||
return pLoadInfo;
|
return pLoadInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +113,13 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
||||||
pInfo->currentLoadBlockIndex ^= 1;
|
pInfo->currentLoadBlockIndex ^= 1;
|
||||||
if (pIter->pSttBlk != NULL) { // current block not loaded yet
|
if (pIter->pSttBlk != NULL) { // current block not loaded yet
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]);
|
|
||||||
|
SBlockData* pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||||
|
TABLEID id = {.suid = pIter->pSttBlk->suid, .uid = 0};
|
||||||
|
|
||||||
|
tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
|
||||||
|
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st)/ 1000.0;
|
double el = (taosGetTimestampUs() - st)/ 1000.0;
|
||||||
pInfo->elapsedTime += el;
|
pInfo->elapsedTime += el;
|
||||||
pInfo->loadBlocks += 1;
|
pInfo->loadBlocks += 1;
|
||||||
|
@ -460,7 +468,8 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, const char* idStr) {
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema,
|
||||||
|
int16_t* pCols, int32_t numOfCols, const char* idStr) {
|
||||||
pMTree->backward = backward;
|
pMTree->backward = backward;
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -475,9 +484,10 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
|
|
||||||
SSttBlockLoadInfo* pLoadInfo = NULL;
|
SSttBlockLoadInfo* pLoadInfo = NULL;
|
||||||
if (pBlockLoadInfo == NULL) {
|
if (pBlockLoadInfo == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
if (pMTree->pLoadInfo == NULL) {
|
if (pMTree->pLoadInfo == NULL) {
|
||||||
pMTree->destroyLoadInfo = true;
|
pMTree->destroyLoadInfo = true;
|
||||||
pMTree->pLoadInfo = tCreateLastBlockLoadInfo();
|
pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
pLoadInfo = pMTree->pLoadInfo;
|
pLoadInfo = pMTree->pLoadInfo;
|
||||||
|
|
|
@ -79,6 +79,7 @@ typedef struct SBlockLoadSuppInfo {
|
||||||
SColumnDataAgg tsColAgg;
|
SColumnDataAgg tsColAgg;
|
||||||
SColumnDataAgg** plist;
|
SColumnDataAgg** plist;
|
||||||
int16_t* colIds; // column ids for loading file block data
|
int16_t* colIds; // column ids for loading file block data
|
||||||
|
int32_t numOfCols;
|
||||||
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
||||||
} SBlockLoadSuppInfo;
|
} SBlockLoadSuppInfo;
|
||||||
|
|
||||||
|
@ -203,6 +204,7 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
size_t numOfCols = blockDataGetNumOfCols(pBlock);
|
size_t numOfCols = blockDataGetNumOfCols(pBlock);
|
||||||
|
|
||||||
|
pSupInfo->numOfCols = numOfCols;
|
||||||
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
|
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
|
||||||
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
|
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
|
||||||
|
@ -352,7 +354,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
|
||||||
tMergeTreeClose(&pLReader->mergeTree);
|
tMergeTreeClose(&pLReader->mergeTree);
|
||||||
|
|
||||||
if (pLReader->pInfo == NULL) {
|
if (pLReader->pInfo == NULL) {
|
||||||
pLReader->pInfo = tCreateLastBlockLoadInfo();
|
// here we ignore the first column, which is always be the primary timestamp column
|
||||||
|
pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
|
||||||
if (pLReader->pInfo == NULL) {
|
if (pLReader->pInfo == NULL) {
|
||||||
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
|
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -1995,7 +1998,8 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
||||||
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr);
|
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo,
|
||||||
|
pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue