Merge pull request #14784 from taosdata/fix/tsdb-cache-cfg
tsdbCache: use cachelast & cachelastsize cfg from vnode
This commit is contained in:
commit
8ff644ffcf
|
@ -138,6 +138,8 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->dbId = pCreate->dbUid;
|
||||
pCfg->szPage = pCreate->pageSize * 1024;
|
||||
pCfg->szCache = pCreate->pages;
|
||||
pCfg->cacheLast = pCreate->cacheLast;
|
||||
pCfg->cacheLastSize = pCreate->cacheLastSize;
|
||||
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
|
||||
pCfg->isWeak = true;
|
||||
pCfg->isTsma = pCreate->isTsma;
|
||||
|
|
|
@ -140,7 +140,10 @@ int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList,
|
|||
void **pReader);
|
||||
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds);
|
||||
int32_t tsdbLastrowReaderClose(void *pReader);
|
||||
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);
|
||||
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||
|
||||
// tq
|
||||
|
||||
|
@ -210,11 +213,13 @@ struct SVnodeCfg {
|
|||
int32_t vgId;
|
||||
char dbname[TSDB_DB_FNAME_LEN];
|
||||
uint64_t dbId;
|
||||
int32_t cacheLastSize;
|
||||
int32_t szPage;
|
||||
int32_t szCache;
|
||||
uint64_t szBuf;
|
||||
bool isHeap;
|
||||
bool isWeak;
|
||||
int8_t cacheLast;
|
||||
int8_t isTsma;
|
||||
int8_t isRsma;
|
||||
int8_t hashMethod;
|
||||
|
|
|
@ -237,6 +237,10 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
|||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf);
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
|
||||
|
||||
#define TSDB_CACHE_NO(c) ((c).cacheLast == 0)
|
||||
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
|
||||
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
|
||||
|
||||
// tsdbCache
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(SLRUCache *pCache);
|
||||
|
@ -246,8 +250,13 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
|||
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
|
||||
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode);
|
||||
|
||||
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
|
||||
|
||||
// structs =======================
|
||||
|
|
|
@ -15,11 +15,15 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
int32_t tsdbOpenCache(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
SLRUCache *pCache = NULL;
|
||||
// TODO: get cfg from vnode config: pTsdb->pVnode->config.lruCapacity
|
||||
size_t cfgCapacity = 1024 * 1024;
|
||||
size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
|
||||
|
||||
pCache = taosLRUCacheInit(cfgCapacity, -1, .5);
|
||||
if (pCache == NULL) {
|
||||
|
@ -61,10 +65,11 @@ static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value)
|
|||
|
||||
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
|
||||
|
||||
static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
// getTableCacheKey(uid, "lr", key, &keyLen);
|
||||
getTableCacheKey(uid, 0, key, &keyLen);
|
||||
|
@ -83,18 +88,79 @@ static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKe
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
||||
getTableCacheKey(uid, 1, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
// clear last cache anyway, no matter where eKey ends.
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||
bool invalidate = false;
|
||||
int16_t nCol = taosArrayGetSize(pLast);
|
||||
|
||||
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (eKey >= tTsVal->ts) {
|
||||
invalidate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (invalidate) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
// getTableCacheKey(uid, "lr", key, &keyLen);
|
||||
getTableCacheKey(uid, 0, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
if (pRow->ts <= eKey) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
||||
getTableCacheKey(uid, 1, key, &keyLen);
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||
bool invalidate = false;
|
||||
int16_t nCol = taosArrayGetSize(pLast);
|
||||
|
||||
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (eKey >= tTsVal->ts) {
|
||||
invalidate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (invalidate) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
|
@ -173,11 +239,6 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
TSKEY ts;
|
||||
SColVal colVal;
|
||||
} SLastCol;
|
||||
|
||||
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
STSRow *cacheRow = NULL;
|
||||
|
@ -405,7 +466,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
case SFSNEXTROW_FS:
|
||||
state->aDFileSet = state->pTsdb->fs->cState->aDFileSet;
|
||||
state->nFileSet = taosArrayGetSize(state->aDFileSet);
|
||||
state->iFileSet = state->nFileSet - 1;
|
||||
state->iFileSet = state->nFileSet;
|
||||
|
||||
state->pBlockData = NULL;
|
||||
|
||||
|
@ -1679,52 +1740,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
||||
// getTableCacheKey(uid, "lr", key, &keyLen);
|
||||
getTableCacheKey(uid, 0, key, &keyLen);
|
||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
|
||||
if (pRow->ts <= eKey) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
||||
getTableCacheKey(uid, 1, key, &keyLen);
|
||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
||||
if (h) {
|
||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
||||
bool invalidate = false;
|
||||
int16_t nCol = taosArrayGetSize(pLast);
|
||||
|
||||
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
if (eKey >= tTsVal->ts) {
|
||||
invalidate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (invalidate) {
|
||||
taosLRUCacheRelease(pCache, h, true);
|
||||
} else {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
}
|
||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -1732,3 +1747,9 @@ int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
|||
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
|
||||
taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
|
||||
}
|
||||
|
||||
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
|
||||
|
|
|
@ -181,8 +181,12 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, version);
|
||||
pMemTable->nDel++;
|
||||
|
||||
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
|
||||
tsdbCacheDelete(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
|
||||
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
}
|
||||
|
||||
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
|
||||
}
|
||||
|
||||
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||
|
@ -556,12 +560,14 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
pTbData->maxKey = key.ts;
|
||||
}
|
||||
|
||||
if (pLastRow != NULL) {
|
||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && pLastRow != NULL) {
|
||||
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
|
||||
}
|
||||
}
|
||||
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
|
||||
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
|
||||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
|
||||
}
|
||||
|
||||
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
||||
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
|
||||
|
|
|
@ -20,6 +20,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
|||
.dbId = 0,
|
||||
.szPage = 4096,
|
||||
.szCache = 256,
|
||||
.cacheLast = 3,
|
||||
.cacheLastSize = 8,
|
||||
.szBuf = 96 * 1024 * 1024,
|
||||
.isHeap = false,
|
||||
.isWeak = 0,
|
||||
|
@ -60,6 +62,8 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "cacheLast", pCfg->cacheLast) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "cacheLastSize", pCfg->cacheLastSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
|
||||
|
@ -133,6 +137,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "szCache", pCfg->szCache, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "cacheLast", pCfg->cacheLast, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "cacheLastSize", pCfg->cacheLastSize, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap, code);
|
||||
|
|
|
@ -936,6 +936,11 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
|
||||
vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
|
||||
alterReq.cacheLastSize);
|
||||
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
|
||||
pVnode->config.cacheLastSize = alterReq.cacheLastSize;
|
||||
// TODO: save config
|
||||
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue