fix(tsdb/cache): rewrite cache update to fix cpu usage
This commit is contained in:
parent
5612f7ff93
commit
d04a33f349
|
@ -344,6 +344,11 @@ static void tsdbCacheDeleter(const void *key, size_t keyLen, void *value) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int idx;
|
||||||
|
SLastKey key;
|
||||||
|
} SIdxKey;
|
||||||
|
|
||||||
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -370,113 +375,166 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
|
||||||
tsdbRowClose(&iter);
|
tsdbRowClose(&iter);
|
||||||
|
|
||||||
// 3, build keys & multi get from rocks
|
// 3, build keys & multi get from rocks
|
||||||
int num_keys = TARRAY_SIZE(aColVal);
|
int num_keys = TARRAY_SIZE(aColVal);
|
||||||
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
TSKEY keyTs = TSDBROW_TS(pRow);
|
||||||
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||||
char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN * 2);
|
|
||||||
|
SArray *oColVal = taosArrayInit(num_keys, sizeof(SColVal));
|
||||||
|
SArray *remainCols = NULL;
|
||||||
|
SLRUCache *pCache = pTsdb->lruCache;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
||||||
int16_t cid = pColVal->cid;
|
int16_t cid = pColVal->cid;
|
||||||
|
|
||||||
memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = 1, .uid = uid, .cid = cid}, ROCKS_KEY_LEN);
|
SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
||||||
memcpy(key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN, &(SLastKey){.ltype = 0, .uid = uid, .cid = cid},
|
size_t klen = ROCKS_KEY_LEN;
|
||||||
ROCKS_KEY_LEN);
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||||
keys_list[i] = key_list + i * ROCKS_KEY_LEN;
|
if (h) {
|
||||||
keys_list[num_keys + i] = key_list + i * ROCKS_KEY_LEN + num_keys * ROCKS_KEY_LEN;
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
|
||||||
keys_list_sizes[num_keys + i] = ROCKS_KEY_LEN;
|
|
||||||
}
|
|
||||||
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
|
||||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
|
||||||
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
|
||||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
|
||||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
|
||||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
|
||||||
for (int i = 0; i < num_keys * 2; ++i) {
|
|
||||||
rocksdb_free(errs[i]);
|
|
||||||
}
|
|
||||||
taosMemoryFree(key_list);
|
|
||||||
taosMemoryFree(keys_list);
|
|
||||||
taosMemoryFree(keys_list_sizes);
|
|
||||||
taosMemoryFree(errs);
|
|
||||||
|
|
||||||
TSKEY keyTs = TSDBROW_TS(pRow);
|
if (pLastCol->ts <= keyTs) {
|
||||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
uint8_t *pVal = NULL;
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
int nData = pLastCol->colVal.value.nData;
|
||||||
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
pVal = pLastCol->colVal.value.pData;
|
||||||
|
}
|
||||||
|
pLastCol->ts = keyTs;
|
||||||
|
pLastCol->colVal = *pColVal;
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
if (nData < pColVal->value.nData) {
|
||||||
|
taosMemoryFree(pVal);
|
||||||
|
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
|
||||||
|
} else {
|
||||||
|
pLastCol->colVal.value.pData = pVal;
|
||||||
|
}
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if (!COL_VAL_IS_NONE(pColVal)) {
|
char *value = NULL;
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
size_t vlen = 0;
|
||||||
|
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
// tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||||
char *value = NULL;
|
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||||
size_t vlen = 0;
|
taosMemoryFree(value);
|
||||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
|
||||||
SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
|
|
||||||
size_t klen = ROCKS_KEY_LEN;
|
|
||||||
rocksdb_writebatch_put(wb, (char *)&key, klen, value, vlen);
|
|
||||||
|
|
||||||
pLastCol = (SLastCol *)value;
|
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
|
||||||
*pTmpLastCol = *pLastCol;
|
|
||||||
pLastCol = pTmpLastCol;
|
|
||||||
|
|
||||||
reallocVarData(&pLastCol->colVal);
|
|
||||||
size_t charge = sizeof(*pLastCol);
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
NULL, TAOS_LRU_PRIORITY_LOW);
|
} else {
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (!remainCols) {
|
||||||
code = -1;
|
remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
|
||||||
}
|
}
|
||||||
|
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||||
taosMemoryFree(value);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
key->ltype = 1;
|
||||||
|
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
|
||||||
|
if (h) {
|
||||||
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
if (pLastCol->ts <= keyTs) {
|
||||||
char *value = NULL;
|
uint8_t *pVal = NULL;
|
||||||
size_t vlen = 0;
|
int nData = pLastCol->colVal.value.nData;
|
||||||
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
|
pVal = pLastCol->colVal.value.pData;
|
||||||
|
}
|
||||||
|
pLastCol->ts = keyTs;
|
||||||
|
pLastCol->colVal = *pColVal;
|
||||||
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
|
if (nData < pColVal->value.nData) {
|
||||||
|
taosMemoryFree(pVal);
|
||||||
|
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
|
||||||
|
} else {
|
||||||
|
pLastCol->colVal.value.pData = pVal;
|
||||||
|
}
|
||||||
|
if (pColVal->value.nData) {
|
||||||
|
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rocksdb_writebatch_put(wb, (char *)&key, ROCKS_KEY_LEN, value, vlen);
|
char *value = NULL;
|
||||||
|
size_t vlen = 0;
|
||||||
pLastCol = (SLastCol *)value;
|
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||||
*pTmpLastCol = *pLastCol;
|
taosMemoryFree(value);
|
||||||
pLastCol = pTmpLastCol;
|
|
||||||
|
|
||||||
reallocVarData(&pLastCol->colVal);
|
|
||||||
size_t charge = sizeof(*pLastCol);
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
NULL, TAOS_LRU_PRIORITY_LOW);
|
} else {
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
if (!remainCols) {
|
||||||
code = -1;
|
remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey));
|
||||||
}
|
}
|
||||||
|
taosArrayPush(remainCols, &(SIdxKey){i, *key});
|
||||||
taosMemoryFree(value);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//}
|
|
||||||
|
|
||||||
rocksdb_free(values_list[i]);
|
|
||||||
rocksdb_free(values_list[i + num_keys]);
|
|
||||||
}
|
}
|
||||||
taosMemoryFree(values_list);
|
|
||||||
taosMemoryFree(values_list_sizes);
|
num_keys = TARRAY_SIZE(remainCols);
|
||||||
|
if (remainCols && num_keys > 0) {
|
||||||
|
char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||||
|
size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||||
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
|
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||||
|
|
||||||
|
keys_list[i] = (char *)&idxKey->key;
|
||||||
|
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||||
|
}
|
||||||
|
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||||
|
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||||
|
char **errs = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||||
|
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
|
||||||
|
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||||
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
|
rocksdb_free(errs[i]);
|
||||||
|
}
|
||||||
|
taosMemoryFree(errs);
|
||||||
|
taosMemoryFree(keys_list);
|
||||||
|
taosMemoryFree(keys_list_sizes);
|
||||||
|
taosMemoryFree(values_list_sizes);
|
||||||
|
|
||||||
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
|
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||||
|
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
|
||||||
|
|
||||||
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
|
|
||||||
|
if (idxKey->key.ltype == 0) {
|
||||||
|
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
||||||
|
char *value = NULL;
|
||||||
|
size_t vlen = 0;
|
||||||
|
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||||
|
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
|
||||||
|
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
|
||||||
|
taosMemoryFree(value);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||||
|
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||||
|
|
||||||
|
if (NULL == pLastCol || pLastCol->ts <= keyTs) {
|
||||||
|
char *value = NULL;
|
||||||
|
size_t vlen = 0;
|
||||||
|
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen);
|
||||||
|
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
|
||||||
|
rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen);
|
||||||
|
taosMemoryFree(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksdb_free(values_list[i]);
|
||||||
|
}
|
||||||
|
taosMemoryFree(values_list);
|
||||||
|
|
||||||
|
taosArrayDestroy(remainCols);
|
||||||
|
}
|
||||||
|
|
||||||
rocksMayWrite(pTsdb, true, false, false);
|
rocksMayWrite(pTsdb, true, false, false);
|
||||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosArrayDestroy(aColVal);
|
taosArrayDestroy(aColVal);
|
||||||
|
@ -651,11 +709,6 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
|
||||||
return pLastCol;
|
return pLastCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int idx;
|
|
||||||
SLastKey key;
|
|
||||||
} SIdxKey;
|
|
||||||
|
|
||||||
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
||||||
SCacheRowsReader *pr, int8_t ltype) {
|
SCacheRowsReader *pr, int8_t ltype) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
Loading…
Reference in New Issue