Merge pull request #20372 from taosdata/fix/main_bugfix_wxy
enh: optimize last/last_row cost when the cache was first loaded
This commit is contained in:
commit
c70c9546f2
|
@ -780,6 +780,7 @@ typedef struct SCacheRowsReader {
|
|||
SDataFReader *pDataFReader;
|
||||
SDataFReader *pDataFReaderLast;
|
||||
const char *idstr;
|
||||
int64_t lastTs;
|
||||
} SCacheRowsReader;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -722,6 +722,7 @@ typedef struct SFSNextRowIter {
|
|||
int32_t iRow;
|
||||
TSDBROW row;
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
int64_t lastTs;
|
||||
} SFSNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
||||
|
@ -816,12 +817,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
SDataBlk block = {0};
|
||||
|
||||
tDataBlkReset(&block);
|
||||
// tBlockDataReset(&state->blockData);
|
||||
tBlockDataReset(state->pBlockData);
|
||||
|
||||
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
|
||||
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL);
|
||||
*/
|
||||
if (block.maxKey.ts <= state->lastTs) {
|
||||
goto _next_fileset;
|
||||
}
|
||||
tBlockDataReset(state->pBlockData);
|
||||
TABLEID tid = {.suid = state->suid, .uid = state->uid};
|
||||
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
|
||||
|
@ -1070,7 +1071,7 @@ typedef struct {
|
|||
|
||||
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
|
||||
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
|
||||
SDataFReader **pDataFReaderLast) {
|
||||
SDataFReader **pDataFReaderLast, int64_t lastTs) {
|
||||
int code = 0;
|
||||
|
||||
STbData *pMem = NULL;
|
||||
|
@ -1141,6 +1142,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
|||
pIter->fsState.uid = uid;
|
||||
pIter->fsState.pLoadInfo = pLoadInfo;
|
||||
pIter->fsState.pDataFReader = pDataFReader;
|
||||
pIter->fsState.lastTs = lastTs;
|
||||
|
||||
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL};
|
||||
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL};
|
||||
|
@ -1315,7 +1317,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast);
|
||||
&pr->pDataFReaderLast, pr->lastTs);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
|
@ -1453,7 +1455,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
|
||||
&pr->pDataFReaderLast);
|
||||
&pr->pDataFReaderLast, pr->lastTs);
|
||||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
|
|
|
@ -199,6 +199,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
|
|||
p->idstr = taosStrdup(idstr);
|
||||
taosThreadMutexInit(&p->readerMutex, NULL);
|
||||
|
||||
p->lastTs = INT64_MIN;
|
||||
|
||||
*pReader = p;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -347,6 +349,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
|
||||
{
|
||||
bool hasNotNullRow = true;
|
||||
int64_t minTs = INT64_MAX;
|
||||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||
int32_t slotId = slotIds[k];
|
||||
|
||||
|
@ -357,6 +361,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
hasRes = true;
|
||||
p->ts = pCol->ts;
|
||||
p->colVal = pCol->colVal;
|
||||
minTs = pCol->ts;
|
||||
|
||||
// only set value for last row query
|
||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
|
||||
|
@ -373,11 +378,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
if (pColVal->ts > p->ts) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
hasRes = true;
|
||||
p->ts = pColVal->ts;
|
||||
if (pColVal->ts < minTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
minTs = pColVal->ts;
|
||||
}
|
||||
|
||||
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
p->colVal = pColVal->colVal;
|
||||
|
@ -394,6 +405,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasNotNullRow) {
|
||||
pr->lastTs = minTs;
|
||||
}
|
||||
}
|
||||
|
||||
tsdbCacheRelease(lruCache, h);
|
||||
|
|
Loading…
Reference in New Issue