Merge pull request #27807 from taosdata/fix/TD-31880-3.0

fix: (last) call rocksdb_write before rocksdb_multi_get
This commit is contained in:
Hongze Cheng 2024-09-12 14:05:29 +08:00 committed by GitHub
commit 19e95c126c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 81 deletions

View File

@ -342,7 +342,6 @@ typedef struct {
rocksdb_writeoptions_t *writeoptions; rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions; rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t *writebatch;
rocksdb_writebatch_t *rwritebatch;
STSchema *pTSchema; STSchema *pTSchema;
} SRocksCache; } SRocksCache;
@ -363,7 +362,6 @@ struct STsdb {
SMemTable *imem; SMemTable *imem;
STsdbFS fs; // old STsdbFS fs; // old
SLRUCache *lruCache; SLRUCache *lruCache;
SCacheFlushState flushState;
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;

View File

@ -214,10 +214,8 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
} }
rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create();
rocksdb_writebatch_t *rwritebatch = rocksdb_writebatch_create();
pTsdb->rCache.writebatch = writebatch; pTsdb->rCache.writebatch = writebatch;
pTsdb->rCache.rwritebatch = rwritebatch;
pTsdb->rCache.my_comparator = cmp; pTsdb->rCache.my_comparator = cmp;
pTsdb->rCache.options = options; pTsdb->rCache.options = options;
pTsdb->rCache.writeoptions = writeoptions; pTsdb->rCache.writeoptions = writeoptions;
@ -248,7 +246,6 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
rocksdb_close(pTsdb->rCache.db); rocksdb_close(pTsdb->rCache.db);
rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions);
rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); rocksdb_writebatch_destroy(pTsdb->rCache.writebatch);
rocksdb_writebatch_destroy(pTsdb->rCache.rwritebatch);
rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); rocksdb_readoptions_destroy(pTsdb->rCache.readoptions);
rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions);
rocksdb_options_destroy(pTsdb->rCache.options); rocksdb_options_destroy(pTsdb->rCache.options);
@ -258,8 +255,8 @@ static void tsdbCloseRocksCache(STsdb *pTsdb) {
taosMemoryFree(pTsdb->rCache.pTSchema); taosMemoryFree(pTsdb->rCache.pTSchema);
} }
static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) { static void rocksMayWrite(STsdb *pTsdb, bool force) {
rocksdb_writebatch_t *wb = read ? pTsdb->rCache.rwritebatch : pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
int count = rocksdb_writebatch_count(wb); int count = rocksdb_writebatch_count(wb);
if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) { if ((force && count > 0) || count >= ROCKS_BATCH_SIZE) {
@ -270,7 +267,6 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count, tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
err); err);
rocksdb_free(err); rocksdb_free(err);
// pTsdb->flushState.flush_count = 0;
} }
rocksdb_writebatch_clear(wb); rocksdb_writebatch_clear(wb);
@ -459,47 +455,23 @@ static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) { static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
int32_t code = 0;
STsdb *pTsdb = state->pTsdb;
SRocksCache *rCache = &pTsdb->rCache;
rocksdb_writebatch_t *wb = rCache->writebatch;
char *rocks_value = NULL;
size_t vlen = 0;
code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
if (code) {
tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
return;
}
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
taosMemoryFree(rocks_value);
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
char *err = NULL;
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
if (NULL != err) {
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
state->flush_count, err);
rocksdb_free(err);
}
rocksdb_writebatch_clear(wb);
state->flush_count = 0;
}
}
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) { int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value; SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) { if (pLastCol->dirty) {
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); STsdb *pTsdb = (STsdb *)ud;
int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
if (code) {
tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
pLastCol->dirty = 0; pLastCol->dirty = 0;
rocksMayWrite(pTsdb, false);
} }
return 0; return 0;
@ -514,10 +486,9 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
(void)taosThreadMutexLock(&pTsdb->lruMutex); (void)taosThreadMutexLock(&pTsdb->lruMutex);
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
rocksMayWrite(pTsdb, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -606,7 +577,7 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
SLastCol *pLastCol = (SLastCol *)value; SLastCol *pLastCol = (SLastCol *)value;
if (pLastCol->dirty) { if (pLastCol->dirty) {
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud); (void)tsdbCacheFlushDirty(key, klen, pLastCol, ud);
} }
for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) { for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
@ -643,11 +614,11 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid}; SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); TAOS_LRU_PRIORITY_LOW, pTsdb);
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
tsdbCacheFreeSLastColItem(pLastCol);
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
pLastCol = NULL;
} }
_exit: _exit:
@ -665,10 +636,9 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState); taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, true);
rocksMayWrite(pTsdb, true, true);
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
if (NULL != err) { if (NULL != err) {
@ -738,6 +708,10 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
char **values_list = NULL; char **values_list = NULL;
size_t *values_list_sizes = NULL; size_t *values_list_sizes = NULL;
// was written by caller
// rocksMayWrite(pTsdb, true); // flush writebatch cache
TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list, TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
&values_list_sizes), &values_list_sizes),
NULL, _exit); NULL, _exit);
@ -862,7 +836,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, false);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -903,7 +877,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, false);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -934,7 +908,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, false);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -973,7 +947,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, false);
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -1083,11 +1057,11 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge)); TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter, LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) { if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
tsdbCacheFreeSLastColItem(pLRULastCol);
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
pLRULastCol = NULL;
} }
_exit: _exit:
@ -1177,6 +1151,8 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
keys_list_sizes[i] = ROCKS_KEY_LEN; keys_list_sizes[i] = ROCKS_KEY_LEN;
} }
rocksMayWrite(pTsdb, true); // flush writebatch cache
code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list, code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
&values_list_sizes); &values_list_sizes);
if (code) { if (code) {
@ -1243,7 +1219,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
taosMemoryFreeClear(pToFree); taosMemoryFreeClear(pToFree);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, false);
taosMemoryFree(keys_list); taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes); taosMemoryFree(keys_list_sizes);
@ -1573,32 +1549,22 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); TAOS_LRU_PRIORITY_LOW, pTsdb);
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) { if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
tsdbCacheFreeSLastColItem(pLastCol); pLastCol = NULL;
taosMemoryFree(pLastCol);
TAOS_CHECK_EXIT(TSDB_CODE_FAILED); TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
} }
// store result back to rocks cache // store result back to rocks cache
wb = pTsdb->rCache.rwritebatch; code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
char *value = NULL;
size_t vlen = 0;
code = tsdbCacheSerialize(pLastCol, &value, &vlen);
if (code) { if (code) {
tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
} else { TAOS_CHECK_EXIT(code);
SLastKey *key = &idxKey->key;
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
taosMemoryFree(value);
} }
} }
if (wb) { rocksMayWrite(pTsdb, false);
rocksMayWrite(pTsdb, false, true);
}
_exit: _exit:
taosArrayDestroy(lastrowTmpIndexArray); taosArrayDestroy(lastrowTmpIndexArray);
@ -1638,6 +1604,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
keys_list_sizes[i] = ROCKS_KEY_LEN; keys_list_sizes[i] = ROCKS_KEY_LEN;
} }
rocksMayWrite(pTsdb, true); // flush writebatch cache
code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list, code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
&values_list_sizes); &values_list_sizes);
if (code) { if (code) {
@ -1686,11 +1654,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
} }
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) { if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status); tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
tsdbCacheFreeSLastColItem(pLastCol);
taosMemoryFreeClear(pLastCol);
taosMemoryFreeClear(pToFree); taosMemoryFreeClear(pToFree);
TAOS_CHECK_EXIT(TSDB_CODE_FAILED); TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
} }
@ -1797,6 +1763,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
if (remainCols && TARRAY_SIZE(remainCols) > 0) { if (remainCols && TARRAY_SIZE(remainCols) > 0) {
(void)taosThreadMutexLock(&pTsdb->lruMutex); (void)taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < TARRAY_SIZE(remainCols);) { for (int i = 0; i < TARRAY_SIZE(remainCols);) {
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN); LRUHandle *h = taosLRUCacheLookup(pCache, &idxKey->key, ROCKS_KEY_LEN);
@ -1910,6 +1877,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
keys_list_sizes[i] = klen; keys_list_sizes[i] = klen;
} }
rocksMayWrite(pTsdb, true); // flush writebatch cache
TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes, TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
&values_list, &values_list_sizes), &values_list, &values_list_sizes),
NULL, _exit); NULL, _exit);
@ -1944,7 +1913,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
taosMemoryFreeClear(pLastCol); taosMemoryFreeClear(pLastCol);
} }
rocksMayWrite(pTsdb, true, false); rocksMayWrite(pTsdb, false);
_exit: _exit:
(void)taosThreadMutexUnlock(&pTsdb->lruMutex); (void)taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -1986,9 +1955,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
(void)taosThreadMutexInit(&pTsdb->lruMutex, NULL); (void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
pTsdb->flushState.pTsdb = pTsdb;
pTsdb->flushState.flush_count = 0;
_err: _err:
if (code) { if (code) {
tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code)); tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));