diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index fc1c039f8b..85a88f65c0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -252,7 +252,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf); int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row); -int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, bool dup); +int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 11c401e04a..28b121fce8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -89,7 +89,7 @@ static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) return code; } -int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, bool dup) { +int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup) { int32_t code = 0; STSRow *cacheRow = NULL; char key[32] = {0}; @@ -100,6 +100,24 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, boo if (h) { cacheRow = (STSRow *)taosLRUCacheValue(pCache, h); if (row->ts >= cacheRow->ts) { + if (row->ts == cacheRow->ts) { + STSRow *mergedRow; + SRowMerger merger = {0}; + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + + tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema); + + tRowMerge(&merger, &tsdbRowFromTSRow(1, row)); + + tRowMergerGetRow(&merger, &mergedRow); + tRowMergerClear(&merger); + + taosMemoryFreeClear(pTSchema); + + row = mergedRow; + dup = false; + } + if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); if (!dup) { @@ -110,7 +128,14 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row, boo } else { taosLRUCacheRelease(pCache, h, true); // tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); - tsdbCacheInsertLastrow(pCache, uid, row, dup); + + _taos_lru_deleter_t deleter = deleteTableCacheLastrow; + LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL, + TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } + /* tsdbCacheInsertLastrow(pCache, uid, row, dup); */ } } } else { @@ -966,7 +991,7 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH return 0; } - tsdbCacheInsertLastrow(pCache, uid, pRow, dup); + tsdbCacheInsertLastrow(pCache, pTsdb, uid, pRow, dup); h = taosLRUCacheLookup(pCache, key, keyLen); //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index f8e9e4ee34..784ea77fb8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -548,13 +548,16 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i } while (row.pTSRow); } - if (key.ts > pTbData->maxKey) { - pTbData->maxKey = key.ts; + if (key.ts >= pTbData->maxKey) { + if (key.ts > pTbData->maxKey) { + pTbData->maxKey = key.ts; + } - if (pLastRow) { - tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, true); + if (pLastRow != NULL) { + tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true); } } + pTbData->minVersion = TMIN(pTbData->minVersion, version); pTbData->maxVersion = TMAX(pTbData->maxVersion, version);