Merge pull request #18640 from taosdata/fix/TS-2189-xx
fix(tsdb/cache): use lru erase to invalidate cache entries
This commit is contained in:
commit
14a384e93f
|
@ -228,23 +228,23 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
|||
invalidate = true;
|
||||
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
|
||||
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
} else { // new inserting key is greater than cached, update cached entry
|
||||
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
|
||||
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _invalidate;
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _invalidate;
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
|
||||
}
|
||||
|
||||
taosArraySet(pLast, iCol, &lastCol);
|
||||
taosArraySet(pLast, iCol, &lastCol);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -253,65 +253,10 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
|||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
taosLRUCacheRelease(pCache, h, invalidate);
|
||||
/*
|
||||
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
if (row->ts >= cacheRow->ts) {
|
||||
if (row->ts == cacheRow->ts) {
|
||||
STSRow *mergedRow = NULL;
|
||||
SRowMerger merger = {0};
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
|
||||
tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema);
|
||||
|
||||
tRowMerge(&merger, &tsdbRowFromTSRow(1, row));
|
||||
|
||||
tRowMergerGetRow(&merger, &mergedRow);
|
||||
tRowMergerClear(&merger);
|
||||
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
row = mergedRow;
|
||||
dup = false;
|
||||
}
|
||||
|
||||
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
|
||||
tdRowCpy(cacheRow, row);
|
||||
if (!dup) {
|
||||
taosMemoryFree(row);
|
||||
}
|
||||
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
// tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX);
|
||||
if (dup) {
|
||||
cacheRow = tdRowDup(row);
|
||||
} else {
|
||||
cacheRow = row;
|
||||
}
|
||||
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
// tsdbCacheInsertLastrow(pCache, uid, row, dup);
|
||||
}
|
||||
}*/
|
||||
} /*else {
|
||||
if (dup) {
|
||||
cacheRow = tdRowDup(row);
|
||||
} else {
|
||||
cacheRow = row;
|
||||
if (invalidate) {
|
||||
taosLRUCacheErase(pCache, key, keyLen);
|
||||
}
|
||||
|
||||
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
|
||||
LRUStatus status =
|
||||
taosLRUCacheInsert(pCache, key, keyLen, cacheRow, TD_ROW_LEN(cacheRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -349,28 +294,28 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
|
|||
|
||||
SColVal colVal = {0};
|
||||
tTSRowGetVal(row, pTSchema, iCol, &colVal);
|
||||
if (!COL_VAL_IS_VALUE(&colVal)) {
|
||||
if (COL_VAL_IS_VALUE(&colVal)) {
|
||||
if (keyTs == tTsVal1->ts && COL_VAL_IS_VALUE(tColVal)) {
|
||||
invalidate = true;
|
||||
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
|
||||
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
} else {
|
||||
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
|
||||
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _invalidate;
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _invalidate;
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, colVal.value.pData, colVal.value.nData);
|
||||
}
|
||||
|
||||
taosArraySet(pLast, iCol, &lastCol);
|
||||
taosArraySet(pLast, iCol, &lastCol);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -379,9 +324,9 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
|
|||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
taosLRUCacheRelease(pCache, h, invalidate);
|
||||
|
||||
// clear last cache anyway, lazy load when get last lookup
|
||||
// taosLRUCacheRelease(pCache, h, true);
|
||||
if (invalidate) {
|
||||
taosLRUCacheErase(pCache, key, keyLen);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue