Merge branch 'enh/tsdb_optimize' of https://github.com/taosdata/TDengine into enh/tsdb_optimize
This commit is contained in:
commit
ce71b8bbe7
|
@ -710,9 +710,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct SSttBlockLoadInfo {
|
typedef struct SSttBlockLoadInfo {
|
||||||
SBlockData blockData[2];
|
SBlockData blockData[2];
|
||||||
void *pBlockArray;
|
void *pSttStatisBlkArray;
|
||||||
SArray *aSttBlk;
|
SArray *aSttBlk;
|
||||||
SArray *pTombBlockArray; // tomb block array list
|
|
||||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||||
int32_t currentLoadBlockIndex;
|
int32_t currentLoadBlockIndex;
|
||||||
int32_t loadBlocks;
|
int32_t loadBlocks;
|
||||||
|
@ -720,10 +719,9 @@ typedef struct SSttBlockLoadInfo {
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
int16_t *colIds;
|
int16_t *colIds;
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
bool checkRemainingRow;
|
bool checkRemainingRow; // todo: no assign value?
|
||||||
bool isLast;
|
bool isLast;
|
||||||
bool sttBlockLoaded;
|
bool sttBlockLoaded;
|
||||||
int32_t numOfStt;
|
|
||||||
|
|
||||||
// keep the last access position, this position may be used to reduce the binary times for
|
// keep the last access position, this position may be used to reduce the binary times for
|
||||||
// starting last block data for a new table
|
// starting last block data for a new table
|
||||||
|
|
|
@ -1799,10 +1799,6 @@ static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileRea
|
||||||
|
|
||||||
SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader;
|
SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader;
|
||||||
|
|
||||||
if (pLoadInfo->pTombBlockArray == NULL) {
|
|
||||||
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
}
|
|
||||||
|
|
||||||
const TTombBlkArray *pBlkArray = NULL;
|
const TTombBlkArray *pBlkArray = NULL;
|
||||||
code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -29,8 +29,6 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pLoadInfo->numOfStt = numOfSttTrigger;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfSttTrigger; ++i) {
|
for (int32_t i = 0; i < numOfSttTrigger; ++i) {
|
||||||
pLoadInfo[i].blockIndex[0] = -1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
pLoadInfo[i].blockIndex[1] = -1;
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
@ -61,7 +59,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pLoadInfo->numOfStt = 1;
|
|
||||||
|
|
||||||
pLoadInfo->blockIndex[0] = -1;
|
pLoadInfo->blockIndex[0] = -1;
|
||||||
pLoadInfo->blockIndex[1] = -1;
|
pLoadInfo->blockIndex[1] = -1;
|
||||||
|
@ -78,7 +75,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
|
||||||
}
|
}
|
||||||
|
|
||||||
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
||||||
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
pLoadInfo->pSchema = pSchema;
|
pLoadInfo->pSchema = pSchema;
|
||||||
pLoadInfo->colIds = colList;
|
pLoadInfo->colIds = colList;
|
||||||
pLoadInfo->numOfCols = numOfCols;
|
pLoadInfo->numOfCols = numOfCols;
|
||||||
|
@ -87,7 +83,7 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
|
for (int32_t i = 0; i < 1; ++i) {
|
||||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
pLoadInfo[i].blockIndex[0] = -1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
pLoadInfo[i].blockIndex[1] = -1;
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
@ -101,7 +97,7 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
|
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
|
||||||
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
|
for (int32_t i = 0; i < 1; ++i) {
|
||||||
*el += pLoadInfo[i].elapsedTime;
|
*el += pLoadInfo[i].elapsedTime;
|
||||||
*blocks += pLoadInfo[i].loadBlocks;
|
*blocks += pLoadInfo[i].loadBlocks;
|
||||||
}
|
}
|
||||||
|
@ -118,7 +114,7 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
|
for (int32_t i = 0; i < 1; ++i) {
|
||||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||||
pLoadInfo[i].blockIndex[0] = -1;
|
pLoadInfo[i].blockIndex[0] = -1;
|
||||||
pLoadInfo[i].blockIndex[1] = -1;
|
pLoadInfo[i].blockIndex[1] = -1;
|
||||||
|
@ -129,8 +125,6 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||||
taosArrayDestroy(pLoadInfo[i].aSttBlk);
|
taosArrayDestroy(pLoadInfo[i].aSttBlk);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyEx(pLoadInfo->pTombBlockArray, freeTombBlock);
|
|
||||||
|
|
||||||
taosMemoryFree(pLoadInfo);
|
taosMemoryFree(pLoadInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -317,8 +311,8 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid) {
|
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||||
TSttBlkArray *pArray = pBlockLoadInfo->pBlockArray;
|
uint64_t suid) {
|
||||||
if (TARRAY2_SIZE(pArray) <= 0) {
|
if (TARRAY2_SIZE(pArray) <= 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -358,40 +352,49 @@ static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo *pBlockLoad
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadSttTombBlockData(SSttFileReader *pSttFileReader, uint64_t suid, SSttBlockLoadInfo *pLoadInfo) {
|
static int32_t uidComparFn(const void* p1, const void* p2) {
|
||||||
if (pLoadInfo->pTombBlockArray == NULL) {
|
const uint64_t* uid1 = p1;
|
||||||
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
|
const uint64_t* uid2 = p2;
|
||||||
|
return (*uid1) - (*uid2);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
||||||
|
SSttFileReader *pReader) {
|
||||||
|
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
const TTombBlkArray *pBlkArray = NULL;
|
int32_t i = 0;
|
||||||
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||||
return code;
|
if (p->minTbid.suid == suid) {
|
||||||
}
|
break;
|
||||||
|
|
||||||
for (int32_t j = 0; j < pBlkArray->size; ++j) {
|
|
||||||
STombBlk *pTombBlk = &pBlkArray->data[j];
|
|
||||||
if (pTombBlk->maxTbid.suid < suid) {
|
|
||||||
continue; // todo use binary search instead
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pTombBlk->minTbid.suid > suid) {
|
for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
||||||
|
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||||
|
if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
STombBlock *pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock));
|
if (p->maxTbid.uid < uid) {
|
||||||
code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock);
|
break;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
// todo handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
void *p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock);
|
|
||||||
if (p == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||||
|
STbStatisBlock block = {0};
|
||||||
|
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||||
|
|
||||||
|
int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ);
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
|
||||||
|
return (index != -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||||
|
@ -412,27 +415,45 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
||||||
|
|
||||||
if (!pBlockLoadInfo->sttBlockLoaded) {
|
if (!pBlockLoadInfo->sttBlockLoaded) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
const TSttBlkArray*pSttBlkArray = NULL;
|
||||||
pBlockLoadInfo->sttBlockLoaded = true;
|
pBlockLoadInfo->sttBlockLoaded = true;
|
||||||
|
|
||||||
code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray);
|
// load the stt block info for each stt-block
|
||||||
|
code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("load stt blk, code:%s, %s", tstrerror(code), idStr);
|
tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid);
|
code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
|
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = loadTombFn(pReader1, pIter->pReader, pBlockLoadInfo);
|
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
|
||||||
|
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
|
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// find the start block
|
bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||||
|
if (!exists) {
|
||||||
|
pIter->iSttBlk = -1;
|
||||||
|
pIter->pSttBlk = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||||
|
// the skey is updated.
|
||||||
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||||
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
|
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
|
||||||
if (pIter->iSttBlk != -1) {
|
if (pIter->iSttBlk != -1) {
|
||||||
|
@ -744,6 +765,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add the list/iter placeholder
|
||||||
while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
|
while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
|
||||||
SArray *pList = taosArrayInit(4, POINTER_BYTES);
|
SArray *pList = taosArrayInit(4, POINTER_BYTES);
|
||||||
taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
|
taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
|
||||||
|
|
|
@ -590,10 +590,6 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) {
|
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) {
|
||||||
if (pLoadInfo->pTombBlockArray == NULL) {
|
|
||||||
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
}
|
|
||||||
|
|
||||||
const TTombBlkArray* pBlkArray = NULL;
|
const TTombBlkArray* pBlkArray = NULL;
|
||||||
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue