cache/dclp: double check to load from tsdb

This commit is contained in:
Minglei Jin 2023-05-09 09:17:26 +08:00
parent 9b275f7421
commit 064c46bc7b
1 changed files with 64 additions and 33 deletions

View File

@ -190,6 +190,25 @@ void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
*size = length; *size = length;
} }
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) {
SLastCol *pLastCol = NULL;
char *err = NULL;
size_t vlen = 0;
char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring);
char *value = NULL;
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err);
}
pLastCol = tsdbCacheDeserialize(value);
return pLastCol;
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
@ -362,6 +381,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
taosMemoryFree(errs); taosMemoryFree(errs);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
bool freeCol = true;
SArray *pTmpColArray = NULL; SArray *pTmpColArray = NULL;
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
@ -376,6 +396,10 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
} }
} }
} else { } else {
taosThreadMutexLock(&pTsdb->rCache.rMutex);
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring);
if (!pLastCol) {
// recalc: load from tsdb // recalc: load from tsdb
int16_t aCols[1] = {cid}; int16_t aCols[1] = {cid};
int16_t slotIds[1] = {pr->pSlotIds[i]}; int16_t slotIds[1] = {pr->pSlotIds[i]};
@ -394,6 +418,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
// still null, then make up a none col value // still null, then make up a none col value
if (!pLastCol) { if (!pLastCol) {
pLastCol = &noneCol; pLastCol = &noneCol;
freeCol = false;
} }
// store result back to rocks cache // store result back to rocks cache
@ -402,22 +427,28 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(pLastCol, &value, &vlen); tsdbCacheSerialize(pLastCol, &value, &vlen);
char key[ROCKS_KEY_LEN]; char key[ROCKS_KEY_LEN];
size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring);
rocksdb_writebatch_put(wb, key, klen, value, vlen); rocksdb_writebatch_put(wb, key, klen, value, vlen);
taosMemoryFree(value);
char *err = NULL; char *err = NULL;
rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err);
if (NULL != err) { if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
rocksdb_free(err); rocksdb_free(err);
} }
}
taosMemoryFree(value); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
} }
taosArrayPush(pLastArray, pLastCol); taosArrayPush(pLastArray, pLastCol);
taosArrayDestroy(pTmpColArray); taosArrayDestroy(pTmpColArray);
taosMemoryFree(values_list[i]); if (freeCol) {
taosMemoryFree(pLastCol);
}
} }
taosMemoryFree(values_list); taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes); taosMemoryFree(values_list_sizes);