Merge pull request #22236 from taosdata/fix/TD-25416
fix(tsdb/cache/del): use lru mutex to fix race with committing
This commit is contained in:
commit
c19feb2fa1
|
@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SLRUCache *pCache = pTsdb->lruCache;
|
|
||||||
SArray *pCidList = pr->pCidList;
|
|
||||||
int num_keys = TARRAY_SIZE(pCidList);
|
|
||||||
|
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
|
||||||
SLastCol *pLastCol = NULL;
|
|
||||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
|
||||||
|
|
||||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
|
|
||||||
if (!h) {
|
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
|
||||||
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
|
|
||||||
if (!h) {
|
|
||||||
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
|
|
||||||
|
|
||||||
size_t charge = sizeof(*pLastCol);
|
|
||||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
|
||||||
charge += pLastCol->colVal.value.nData;
|
|
||||||
}
|
|
||||||
|
|
||||||
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
|
|
||||||
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
|
||||||
if (status != TAOS_LRU_STATUS_OK) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
|
||||||
reallocVarData(&lastCol.colVal);
|
|
||||||
|
|
||||||
if (h) {
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pLastArray, &lastCol);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
|
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// fetch schema
|
// fetch schema
|
||||||
|
@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
|
||||||
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
|
||||||
|
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||||
rocksMayWrite(pTsdb, true, false, false);
|
rocksMayWrite(pTsdb, true, false, false);
|
||||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
|
||||||
|
@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
rocksdb_free(values_list[i]);
|
rocksdb_free(values_list[i]);
|
||||||
rocksdb_free(values_list[i + num_keys]);
|
rocksdb_free(values_list[i + num_keys]);
|
||||||
|
|
||||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
// taosThreadMutexLock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
|
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
|
@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
}
|
}
|
||||||
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
// taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < num_keys; ++i) {
|
for (int i = 0; i < num_keys; ++i) {
|
||||||
taosMemoryFree(keys_list[i]);
|
taosMemoryFree(keys_list[i]);
|
||||||
|
@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
|
|
||||||
rocksMayWrite(pTsdb, true, false, true);
|
rocksMayWrite(pTsdb, true, false, true);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosMemoryFree(pTSchema);
|
taosMemoryFree(pTSchema);
|
||||||
|
|
||||||
|
@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
|
|
||||||
int32_t code = 0;
|
|
||||||
char key[32] = {0};
|
|
||||||
int keyLen = 0;
|
|
||||||
|
|
||||||
// getTableCacheKey(uid, "lr", key, &keyLen);
|
|
||||||
getTableCacheKey(uid, 0, key, &keyLen);
|
|
||||||
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
|
|
||||||
if (h) {
|
|
||||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
|
||||||
bool invalidate = false;
|
|
||||||
int16_t nCol = taosArrayGetSize(pLast);
|
|
||||||
|
|
||||||
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
|
||||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
|
||||||
if (eKey >= tTsVal->ts) {
|
|
||||||
invalidate = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (invalidate) {
|
|
||||||
taosLRUCacheRelease(pCache, h, true);
|
|
||||||
} else {
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getTableCacheKey(uid, "l", key, &keyLen);
|
|
||||||
getTableCacheKey(uid, 1, key, &keyLen);
|
|
||||||
h = taosLRUCacheLookup(pCache, key, keyLen);
|
|
||||||
if (h) {
|
|
||||||
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
|
|
||||||
bool invalidate = false;
|
|
||||||
int16_t nCol = taosArrayGetSize(pLast);
|
|
||||||
|
|
||||||
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
|
|
||||||
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
|
|
||||||
if (eKey >= tTsVal->ts) {
|
|
||||||
invalidate = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (invalidate) {
|
|
||||||
taosLRUCacheRelease(pCache, h, true);
|
|
||||||
} else {
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
|
||||||
}
|
|
||||||
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
|
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STSRow *cacheRow = NULL;
|
STSRow *cacheRow = NULL;
|
||||||
|
@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
|
||||||
}
|
}
|
||||||
|
|
||||||
if (record.version <= pReader->info.verRange.maxVer) {
|
if (record.version <= pReader->info.verRange.maxVer) {
|
||||||
|
/*
|
||||||
|
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
|
||||||
|
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
|
||||||
|
*/
|
||||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||||
taosArrayPush(pInfo->pTombData, &delData);
|
taosArrayPush(pInfo->pTombData, &delData);
|
||||||
}
|
}
|
||||||
|
@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
loadDataTomb(state->pr, state->pr->pFileReader);
|
|
||||||
|
|
||||||
state->pr->pCurFileSet = state->pFileSet;
|
state->pr->pCurFileSet = state->pFileSet;
|
||||||
|
|
||||||
|
loadDataTomb(state->pr, state->pr->pFileReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!state->pIndexList) {
|
if (!state->pIndexList) {
|
||||||
|
@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
state->iBrinIndex = indexSize;
|
state->iBrinIndex = indexSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (state->pFileSet != state->pr->pCurFileSet) {
|
||||||
|
state->pr->pCurFileSet = state->pFileSet;
|
||||||
|
}
|
||||||
|
|
||||||
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
|
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
|
||||||
state->pr, state->lastTs, aCols, nCols);
|
state->pr, state->lastTs, aCols, nCols);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue