tsdbCache: new dup parameter to insert last row
This commit is contained in:
parent
deb81301e8
commit
e5944b08fb
|
@ -252,7 +252,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
|
|||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(SLRUCache *pCache);
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, bool dup);
|
||||
|
||||
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
|
|
|
@ -89,7 +89,7 @@ static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey)
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, bool dup) {
|
||||
int32_t code = 0;
|
||||
STSRow *cacheRow = NULL;
|
||||
char key[32] = {0};
|
||||
|
@ -102,16 +102,23 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
|||
if (row->ts >= cacheRow->ts) {
|
||||
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
|
||||
tdRowCpy(cacheRow, row);
|
||||
if (!dup) {
|
||||
taosMemoryFree(row);
|
||||
}
|
||||
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
// tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
|
||||
tsdbCacheInsertLastrow(pCache, uid, row);
|
||||
tsdbCacheInsertLastrow(pCache, uid, row, dup);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (dup) {
|
||||
cacheRow = tdRowDup(row);
|
||||
} else {
|
||||
cacheRow = row;
|
||||
}
|
||||
|
||||
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
|
||||
LRUStatus status =
|
||||
|
@ -345,7 +352,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
/* tBlockIdxReset(state->blockIdx); */
|
||||
/* } */
|
||||
/* tBlockIdxReset(state->blockIdx); */
|
||||
/* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx, &state->blockIdx);
|
||||
/* code = tMapDataSearch(&state->blockIdxMap, state->pBlockIdxExp, tGetBlockIdx, tCmprBlockIdx,
|
||||
* &state->blockIdx);
|
||||
*/
|
||||
state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
|
||||
if (code) goto _err;
|
||||
|
@ -530,7 +538,7 @@ typedef struct TsdbNextRowState {
|
|||
_next_row_fn_t nextRowFn;
|
||||
} TsdbNextRowState;
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
|
@ -667,6 +675,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
|
||||
// merge if nMerge > 1
|
||||
if (nMerge > 0) {
|
||||
*dup = false;
|
||||
|
||||
if (nMerge == 1) {
|
||||
code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
|
||||
if (code) goto _err;
|
||||
|
@ -944,14 +954,19 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
|
|||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
} else {
|
||||
STSRow *pRow = NULL;
|
||||
code = mergeLastRow(uid, pTsdb, &pRow);
|
||||
bool dup = false;
|
||||
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
if (!dup) {
|
||||
taosMemoryFree(pRow);
|
||||
}
|
||||
|
||||
*handle = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tsdbCacheInsertLastrow(pCache, uid, pRow);
|
||||
tsdbCacheInsertLastrow(pCache, uid, pRow, dup);
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
}
|
||||
|
|
|
@ -552,7 +552,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
pTbData->maxKey = key.ts;
|
||||
|
||||
if (pLastRow) {
|
||||
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
|
||||
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, true);
|
||||
}
|
||||
}
|
||||
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
||||
|
|
Loading…
Reference in New Issue