fix: (last) call rocksdb_write before rocksdb_multi_get
This commit is contained in:
parent
fede52d5b7
commit
44a955218c
|
@ -362,7 +362,6 @@ struct STsdb {
|
|||
SMemTable *imem;
|
||||
STsdbFS fs; // old
|
||||
SLRUCache *lruCache;
|
||||
SCacheFlushState flushState;
|
||||
TdThreadMutex lruMutex;
|
||||
SLRUCache *biCache;
|
||||
TdThreadMutex biMutex;
|
||||
|
|
|
@ -267,7 +267,6 @@ static void rocksMayWrite(STsdb *pTsdb, bool force) {
|
|||
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, count,
|
||||
err);
|
||||
rocksdb_free(err);
|
||||
// pTsdb->flushState.flush_count = 0;
|
||||
}
|
||||
|
||||
rocksdb_writebatch_clear(wb);
|
||||
|
@ -456,47 +455,23 @@ static int32_t tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size
|
|||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
static void tsdbCachePutBatch(SLastCol *pLastCol, const void *key, size_t klen, SCacheFlushState *state) {
|
||||
int32_t code = 0;
|
||||
STsdb *pTsdb = state->pTsdb;
|
||||
SRocksCache *rCache = &pTsdb->rCache;
|
||||
rocksdb_writebatch_t *wb = rCache->writebatch;
|
||||
char *rocks_value = NULL;
|
||||
size_t vlen = 0;
|
||||
|
||||
code = tsdbCacheSerialize(pLastCol, &rocks_value, &vlen);
|
||||
if (code) {
|
||||
tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return;
|
||||
}
|
||||
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, rocks_value, vlen);
|
||||
|
||||
taosMemoryFree(rocks_value);
|
||||
|
||||
if (++state->flush_count >= ROCKS_BATCH_SIZE) {
|
||||
char *err = NULL;
|
||||
|
||||
rocksdb_write(rCache->db, rCache->writeoptions, wb, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d, count: %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
state->flush_count, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
rocksdb_writebatch_clear(wb);
|
||||
|
||||
state->flush_count = 0;
|
||||
}
|
||||
}
|
||||
static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol);
|
||||
|
||||
int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
|
||||
SLastCol *pLastCol = (SLastCol *)value;
|
||||
|
||||
if (pLastCol->dirty) {
|
||||
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
|
||||
STsdb *pTsdb = (STsdb *)ud;
|
||||
|
||||
int32_t code = tsdbCachePutToRocksdb(pTsdb, (SLastKey *)key, pLastCol);
|
||||
if (code) {
|
||||
tsdbError("tsdb/cache: vgId:%d, flush dirty lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
pLastCol->dirty = 0;
|
||||
|
||||
rocksMayWrite(pTsdb, false);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -511,7 +486,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
|
|||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState);
|
||||
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
|
||||
|
@ -602,7 +577,7 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
|||
SLastCol *pLastCol = (SLastCol *)value;
|
||||
|
||||
if (pLastCol->dirty) {
|
||||
tsdbCachePutBatch(pLastCol, key, klen, (SCacheFlushState *)ud);
|
||||
tsdbCacheFlushDirty(key, klen, pLastCol, ud);
|
||||
}
|
||||
|
||||
for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
|
||||
|
@ -639,11 +614,11 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
|||
|
||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
tsdbCacheFreeSLastColItem(pLastCol);
|
||||
code = TSDB_CODE_FAILED;
|
||||
pLastCol = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -661,7 +636,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
|
|||
SLRUCache *pCache = pTsdb->lruCache;
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
|
||||
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState);
|
||||
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, pTsdb);
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
|
||||
|
@ -733,6 +708,10 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
|||
|
||||
char **values_list = NULL;
|
||||
size_t *values_list_sizes = NULL;
|
||||
|
||||
// was written by caller
|
||||
// rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, 2, (const char *const *)keys_list, keys_list_sizes, &values_list,
|
||||
&values_list_sizes),
|
||||
NULL, _exit);
|
||||
|
@ -857,7 +836,7 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
|
|||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
|
@ -898,7 +877,7 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
|
|||
|
||||
taosMemoryFree(pTSchema);
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
|
@ -929,7 +908,7 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h
|
|||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
|
@ -968,7 +947,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool
|
|||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
|
@ -1078,11 +1057,11 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
|
|||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
tsdbCacheFreeSLastColItem(pLRULastCol);
|
||||
code = TSDB_CODE_FAILED;
|
||||
pLRULastCol = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
@ -1172,6 +1151,8 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
|
||||
&values_list_sizes);
|
||||
if (code) {
|
||||
|
@ -1238,7 +1219,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
taosMemoryFreeClear(pToFree);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
|
@ -1568,32 +1549,22 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
tsdbCacheFreeSLastColItem(pLastCol);
|
||||
taosMemoryFree(pLastCol);
|
||||
pLastCol = NULL;
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
wb = pTsdb->rCache.writebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
code = tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
||||
if (code) {
|
||||
tsdbError("tsdb/cache: vgId:%d, serialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
} else {
|
||||
SLastKey *key = &idxKey->key;
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
}
|
||||
|
||||
if (wb) {
|
||||
rocksMayWrite(pTsdb, false);
|
||||
}
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
_exit:
|
||||
taosArrayDestroy(lastrowTmpIndexArray);
|
||||
|
@ -1633,6 +1604,8 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
|
||||
&values_list_sizes);
|
||||
if (code) {
|
||||
|
@ -1681,11 +1654,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
tsdbCacheFreeSLastColItem(pLastCol);
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
@ -1792,7 +1763,6 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
|||
|
||||
if (remainCols && TARRAY_SIZE(remainCols) > 0) {
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
for (int i = 0; i < TARRAY_SIZE(remainCols);) {
|
||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i];
|
||||
|
@ -1907,6 +1877,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
keys_list_sizes[i] = klen;
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
|
||||
&values_list, &values_list_sizes),
|
||||
NULL, _exit);
|
||||
|
@ -1941,7 +1913,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
taosMemoryFreeClear(pLastCol);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true);
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
_exit:
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
@ -1983,9 +1955,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
|||
|
||||
(void)taosThreadMutexInit(&pTsdb->lruMutex, NULL);
|
||||
|
||||
pTsdb->flushState.pTsdb = pTsdb;
|
||||
pTsdb->flushState.flush_count = 0;
|
||||
|
||||
_err:
|
||||
if (code) {
|
||||
tsdbError("tsdb/cache: vgId:%d, open failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino, tstrerror(code));
|
||||
|
|
Loading…
Reference in New Issue