tsdbCache/insert: fix row merging from mem

This commit is contained in:
Minglei Jin 2022-07-01 11:15:22 +08:00
parent e5944b08fb
commit e3bb10218a
3 changed files with 36 additions and 8 deletions

View File

@ -252,7 +252,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache); void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, bool dup); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);

View File

@ -89,7 +89,7 @@ static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey)
return code; return code;
} }
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, bool dup) { int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup) {
int32_t code = 0; int32_t code = 0;
STSRow *cacheRow = NULL; STSRow *cacheRow = NULL;
char key[32] = {0}; char key[32] = {0};
@ -100,6 +100,24 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, boo
if (h) { if (h) {
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h); cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (row->ts >= cacheRow->ts) { if (row->ts >= cacheRow->ts) {
if (row->ts == cacheRow->ts) {
STSRow *mergedRow;
SRowMerger merger = {0};
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema);
tRowMerge(&merger, &tsdbRowFromTSRow(1, row));
tRowMergerGetRow(&merger, &mergedRow);
tRowMergerClear(&merger);
taosMemoryFreeClear(pTSchema);
row = mergedRow;
dup = false;
}
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
tdRowCpy(cacheRow, row); tdRowCpy(cacheRow, row);
if (!dup) { if (!dup) {
@ -110,7 +128,14 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, boo
} else { } else {
taosLRUCacheRelease(pCache, h, true); taosLRUCacheRelease(pCache, h, true);
// tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); // tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
tsdbCacheInsertLastrow(pCache, uid, row, dup);
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL,
TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
/* tsdbCacheInsertLastrow(pCache, uid, row, dup); */
} }
} }
} else { } else {
@ -966,7 +991,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return 0; return 0;
} }
tsdbCacheInsertLastrow(pCache, uid, pRow, dup); tsdbCacheInsertLastrow(pCache, pTsdb, uid, pRow, dup);
h = taosLRUCacheLookup(pCache, key, keyLen); h = taosLRUCacheLookup(pCache, key, keyLen);
//*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} }

View File

@ -548,13 +548,16 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
} while (row.pTSRow); } while (row.pTSRow);
} }
if (key.ts > pTbData->maxKey) { if (key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.ts; if (key.ts > pTbData->maxKey) {
pTbData->maxKey = key.ts;
}
if (pLastRow) { if (pLastRow != NULL) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, true); tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
} }
} }
pTbData->minVersion = TMIN(pTbData->minVersion, version); pTbData->minVersion = TMIN(pTbData->minVersion, version);
pTbData->maxVersion = TMAX(pTbData->maxVersion, version); pTbData->maxVersion = TMAX(pTbData->maxVersion, version);