Merge pull request #28163 from taosdata/fix/TD-32338-3.0
enh: add lrucache overwriter
This commit is contained in:
commit
fe18c77809
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
typedef struct SLRUCache SLRUCache;
|
typedef struct SLRUCache SLRUCache;
|
||||||
|
|
||||||
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value, void *ud);
|
typedef void (*_taos_lru_deleter_t)(const void *key, size_t keyLen, void *value, void *ud);
|
||||||
|
typedef void (*_taos_lru_overwriter_t)(const void *key, size_t keyLen, void *value, void *ud);
|
||||||
typedef int (*_taos_lru_functor_t)(const void *key, size_t keyLen, void *value, void *ud);
|
typedef int (*_taos_lru_functor_t)(const void *key, size_t keyLen, void *value, void *ud);
|
||||||
|
|
||||||
typedef struct LRUHandle LRUHandle;
|
typedef struct LRUHandle LRUHandle;
|
||||||
|
@ -42,7 +43,8 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
|
||||||
void taosLRUCacheCleanup(SLRUCache *cache);
|
void taosLRUCacheCleanup(SLRUCache *cache);
|
||||||
|
|
||||||
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
|
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
|
||||||
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud);
|
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle,
|
||||||
|
LRUPriority priority, void *ud);
|
||||||
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen);
|
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen);
|
||||||
void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen);
|
void taosLRUCacheErase(SLRUCache *cache, const void *key, size_t keyLen);
|
||||||
|
|
||||||
|
|
|
@ -646,7 +646,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to cache.
|
// add to cache.
|
||||||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
|
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
|
||||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
_end:
|
_end:
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
|
@ -804,7 +804,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to cache.
|
// add to cache.
|
||||||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
|
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
|
||||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
_end:
|
_end:
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
|
|
|
@ -597,6 +597,13 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
||||||
taosMemoryFree(value);
|
taosMemoryFree(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void *ud) {
|
||||||
|
SLastCol *pLastCol = (SLastCol *)value;
|
||||||
|
pLastCol->dirty = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
|
||||||
|
|
||||||
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
||||||
int32_t code = 0, lino = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
|
@ -606,27 +613,10 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
||||||
SLastCol emptyCol = {
|
SLastCol emptyCol = {
|
||||||
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
|
|
||||||
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
|
||||||
if (!pLastCol) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t charge = 0;
|
|
||||||
*pLastCol = emptyCol;
|
|
||||||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge));
|
|
||||||
|
|
||||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
|
||||||
TAOS_LRU_PRIORITY_LOW, pTsdb);
|
if (code) {
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
|
||||||
code = TSDB_CODE_FAILED;
|
|
||||||
pLastCol = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
taosMemoryFree(pLastCol);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
@ -1071,40 +1061,6 @@ typedef struct {
|
||||||
SLastKey key;
|
SLastKey key;
|
||||||
} SIdxKey;
|
} SIdxKey;
|
||||||
|
|
||||||
static int32_t tsdbCacheUpdateValue(SValue *pOld, SValue *pNew) {
|
|
||||||
uint8_t *pFree = NULL;
|
|
||||||
int nData = 0;
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pOld->type)) {
|
|
||||||
pFree = pOld->pData;
|
|
||||||
nData = pOld->nData;
|
|
||||||
}
|
|
||||||
|
|
||||||
*pOld = *pNew;
|
|
||||||
if (IS_VAR_DATA_TYPE(pNew->type)) {
|
|
||||||
if (nData < pNew->nData) {
|
|
||||||
pOld->pData = taosMemoryCalloc(1, pNew->nData);
|
|
||||||
if (!pOld->pData) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pOld->pData = pFree;
|
|
||||||
pFree = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNew->nData) {
|
|
||||||
memcpy(pOld->pData, pNew->pData, pNew->nData);
|
|
||||||
} else {
|
|
||||||
pFree = pOld->pData;
|
|
||||||
pOld->pData = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(pFree);
|
|
||||||
|
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
|
static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus cacheStatus) {
|
||||||
// update rowkey
|
// update rowkey
|
||||||
pLastCol->rowKey.ts = TSKEY_MIN;
|
pLastCol->rowKey.ts = TSKEY_MIN;
|
||||||
|
@ -1128,11 +1084,7 @@ static void tsdbCacheUpdateLastColToNone(SLastCol *pLastCol, ELastCacheStatus ca
|
||||||
}
|
}
|
||||||
|
|
||||||
pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
|
pLastCol->colVal = COL_VAL_NONE(pLastCol->colVal.cid, pLastCol->colVal.value.type);
|
||||||
|
|
||||||
if (!pLastCol->dirty) {
|
|
||||||
pLastCol->dirty = 1;
|
pLastCol->dirty = 1;
|
||||||
}
|
|
||||||
|
|
||||||
pLastCol->cacheStatus = cacheStatus;
|
pLastCol->cacheStatus = cacheStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1155,7 +1107,7 @@ static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
|
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
|
||||||
int32_t code = 0, lino = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||||
|
@ -1165,11 +1117,11 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
|
||||||
|
|
||||||
size_t charge = 0;
|
size_t charge = 0;
|
||||||
*pLRULastCol = *pLastCol;
|
*pLRULastCol = *pLastCol;
|
||||||
pLRULastCol->dirty = 1;
|
pLRULastCol->dirty = dirty;
|
||||||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
|
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
||||||
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||||
code = TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
@ -1216,8 +1168,9 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
||||||
if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||||
int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
|
int32_t cmp_res = tRowKeyCompare(&pLastCol->rowKey, pRowKey);
|
||||||
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||||
SLastCol newLastCol = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
SLastCol newLastCol = {
|
||||||
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol);
|
.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
|
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1296,7 +1249,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
||||||
SLastCol *pToFree = pLastCol;
|
SLastCol *pToFree = pLastCol;
|
||||||
|
|
||||||
if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
|
if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
|
||||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
taosMemoryFreeClear(pToFree);
|
taosMemoryFreeClear(pToFree);
|
||||||
|
@ -1319,14 +1272,14 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
if (NULL == pLastCol || cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||||
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 0, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||||
if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
tsdbError("tsdb/cache: vgId:%d, put rocks failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
taosMemoryFreeClear(pToFree);
|
taosMemoryFreeClear(pToFree);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
taosMemoryFreeClear(pToFree);
|
taosMemoryFreeClear(pToFree);
|
||||||
|
@ -1681,30 +1634,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
// store result back to rocks cache
|
||||||
if (!pTmpLastCol) {
|
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
||||||
TAOS_CHECK_EXIT(terrno);
|
if (code) {
|
||||||
}
|
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||||
|
|
||||||
size_t charge = 0;
|
|
||||||
*pTmpLastCol = *pLastCol;
|
|
||||||
pLastCol = pTmpLastCol;
|
|
||||||
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
taosMemoryFree(pLastCol);
|
|
||||||
TAOS_CHECK_EXIT(code);
|
TAOS_CHECK_EXIT(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||||
TAOS_LRU_PRIORITY_LOW, pTsdb);
|
|
||||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
|
||||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
|
||||||
pLastCol = NULL;
|
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
|
||||||
}
|
|
||||||
|
|
||||||
// store result back to rocks cache
|
|
||||||
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||||
TAOS_CHECK_EXIT(code);
|
TAOS_CHECK_EXIT(code);
|
||||||
|
@ -1779,18 +1716,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
SLastCol *pToFree = pLastCol;
|
SLastCol *pToFree = pLastCol;
|
||||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
||||||
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||||
if (!pTmpLastCol) {
|
if (code) {
|
||||||
taosMemoryFreeClear(pToFree);
|
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||||
TAOS_CHECK_EXIT(terrno);
|
tstrerror(code));
|
||||||
}
|
|
||||||
|
|
||||||
size_t charge = 0;
|
|
||||||
*pTmpLastCol = *pLastCol;
|
|
||||||
pLastCol = pTmpLastCol;
|
|
||||||
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
taosMemoryFreeClear(pLastCol);
|
|
||||||
taosMemoryFreeClear(pToFree);
|
taosMemoryFreeClear(pToFree);
|
||||||
TAOS_CHECK_EXIT(code);
|
TAOS_CHECK_EXIT(code);
|
||||||
}
|
}
|
||||||
|
@ -1798,20 +1727,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
|
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tsdbCacheFreeSLastColItem(pLastCol);
|
|
||||||
taosMemoryFreeClear(pLastCol);
|
|
||||||
taosMemoryFreeClear(pToFree);
|
taosMemoryFreeClear(pToFree);
|
||||||
TAOS_CHECK_EXIT(code);
|
TAOS_CHECK_EXIT(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
|
||||||
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
|
||||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
|
||||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
|
||||||
taosMemoryFreeClear(pToFree);
|
|
||||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
taosArrayRemove(remainCols, j);
|
taosArrayRemove(remainCols, j);
|
||||||
taosArrayRemove(ignoreFromRocks, j);
|
taosArrayRemove(ignoreFromRocks, j);
|
||||||
|
@ -1999,8 +1918,9 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
|
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
|
||||||
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
|
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
|
||||||
.colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
|
.colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
|
||||||
|
.dirty = 1,
|
||||||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||||
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol);
|
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
|
||||||
}
|
}
|
||||||
if (taosLRUCacheRelease(pTsdb->lruCache, h, false) != TSDB_CODE_SUCCESS) {
|
if (taosLRUCacheRelease(pTsdb->lruCache, h, false) != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||||
|
@ -2065,6 +1985,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||||
SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
|
SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
|
||||||
.colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
|
.colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
|
||||||
|
.dirty = 0,
|
||||||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||||
|
|
||||||
if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2072,7 +1993,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFreeClear(pLastCol);
|
taosMemoryFreeClear(pLastCol);
|
||||||
tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -3660,7 +3581,7 @@ int32_t tsdbCacheGetBlockS3(SLRUCache *pCache, STsdbFD *pFD, LRUHandle **handle)
|
||||||
size_t charge = tsS3BlockSize * pFD->szPage;
|
size_t charge = tsS3BlockSize * pFD->szPage;
|
||||||
_taos_lru_deleter_t deleter = deleteBCache;
|
_taos_lru_deleter_t deleter = deleteBCache;
|
||||||
LRUStatus status =
|
LRUStatus status =
|
||||||
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW, NULL);
|
taosLRUCacheInsert(pCache, key, keyLen, pBlock, charge, deleter, NULL, &h, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
// code = -1;
|
// code = -1;
|
||||||
}
|
}
|
||||||
|
@ -3703,7 +3624,7 @@ void tsdbCacheSetPageS3(SLRUCache *pCache, STsdbFD *pFD, int64_t pgno, uint8_t *
|
||||||
memcpy(pPg, pPage, charge);
|
memcpy(pPg, pPage, charge);
|
||||||
|
|
||||||
LRUStatus status =
|
LRUStatus status =
|
||||||
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
|
taosLRUCacheInsert(pCache, key, keyLen, pPg, charge, deleter, NULL, &handle, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
// ignore cache updating if not ok
|
// ignore cache updating if not ok
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -737,7 +737,7 @@ _end:
|
||||||
|
|
||||||
if (NULL != pVal) {
|
if (NULL != pVal) {
|
||||||
insertRet = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
|
insertRet = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
|
||||||
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
sizeof(STableCachedVal), freeCachedMetaItem, NULL, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
if (insertRet != TAOS_LRU_STATUS_OK) {
|
if (insertRet != TAOS_LRU_STATUS_OK) {
|
||||||
qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr);
|
qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ static int64_t idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_
|
||||||
memcpy(buf + total, blk->buf + blkOffset, nread);
|
memcpy(buf + total, blk->buf + blkOffset, nread);
|
||||||
|
|
||||||
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
|
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
|
||||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
if (s != TAOS_LRU_STATUS_OK) {
|
if (s != TAOS_LRU_STATUS_OK) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ enum {
|
||||||
struct SLRUEntry {
|
struct SLRUEntry {
|
||||||
void *value;
|
void *value;
|
||||||
_taos_lru_deleter_t deleter;
|
_taos_lru_deleter_t deleter;
|
||||||
|
_taos_lru_overwriter_t overwriter;
|
||||||
void *ud;
|
void *ud;
|
||||||
SLRUEntry *nextHash;
|
SLRUEntry *nextHash;
|
||||||
SLRUEntry *next;
|
SLRUEntry *next;
|
||||||
|
@ -403,6 +404,10 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
|
||||||
if (old != NULL) {
|
if (old != NULL) {
|
||||||
status = TAOS_LRU_STATUS_OK_OVERWRITTEN;
|
status = TAOS_LRU_STATUS_OK_OVERWRITTEN;
|
||||||
|
|
||||||
|
if (old->overwriter) {
|
||||||
|
(*old->overwriter)(old->keyData, old->keyLength, old->value, old->ud);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||||
if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
|
if (!TAOS_LRU_ENTRY_HAS_REFS(old)) {
|
||||||
taosLRUCacheShardLRURemove(shard, old);
|
taosLRUCacheShardLRURemove(shard, old);
|
||||||
|
@ -440,8 +445,9 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
|
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
|
||||||
void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
|
void *value, size_t charge, _taos_lru_deleter_t deleter,
|
||||||
LRUPriority priority, void *ud) {
|
_taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority,
|
||||||
|
void *ud) {
|
||||||
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
|
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
|
||||||
if (!e) {
|
if (!e) {
|
||||||
if (deleter) {
|
if (deleter) {
|
||||||
|
@ -453,6 +459,7 @@ static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key,
|
||||||
e->value = value;
|
e->value = value;
|
||||||
e->flags = 0;
|
e->flags = 0;
|
||||||
e->deleter = deleter;
|
e->deleter = deleter;
|
||||||
|
e->overwriter = overwriter;
|
||||||
e->ud = ud;
|
e->ud = ud;
|
||||||
e->keyLength = keyLen;
|
e->keyLength = keyLen;
|
||||||
e->hash = hash;
|
e->hash = hash;
|
||||||
|
@ -726,12 +733,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
|
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
|
||||||
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority, void *ud) {
|
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) {
|
||||||
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
|
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
|
||||||
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
|
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
|
||||||
|
|
||||||
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
|
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, overwriter,
|
||||||
priority, ud);
|
handle, priority, ud);
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
|
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
|
||||||
|
|
Loading…
Reference in New Issue