fix: (last) dirty mark correction
This commit is contained in:
parent
10bd0fed21
commit
549d27955e
|
@ -602,6 +602,8 @@ static void tsdbCacheOverWriter(const void *key, size_t klen, void *value, void
|
|||
pLastCol->dirty = 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty);
|
||||
|
||||
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
|
||||
int32_t code = 0, lino = 0;
|
||||
|
||||
|
@ -611,27 +613,10 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
|
|||
SLastCol emptyCol = {
|
||||
.rowKey = emptyRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
|
||||
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pLastCol) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
size_t charge = 0;
|
||||
*pLastCol = emptyCol;
|
||||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLastCol, &charge));
|
||||
|
||||
SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
||||
tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
code = TSDB_CODE_FAILED;
|
||||
pLastCol = NULL;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, pLastKey, &emptyCol, 1);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
}
|
||||
|
||||
TAOS_RETURN(code);
|
||||
|
@ -1122,7 +1107,7 @@ static int32_t tsdbCachePutToRocksdb(STsdb *pTsdb, SLastKey *pLastKey, SLastCol
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol) {
|
||||
static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLastCol, int8_t dirty) {
|
||||
int32_t code = 0, lino = 0;
|
||||
|
||||
SLastCol *pLRULastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
|
@ -1132,6 +1117,7 @@ static int32_t tsdbCachePutToLRU(STsdb *pTsdb, SLastKey *pLastKey, SLastCol *pLa
|
|||
|
||||
size_t charge = 0;
|
||||
*pLRULastCol = *pLastCol;
|
||||
pLRULastCol->dirty = dirty;
|
||||
TAOS_CHECK_EXIT(tsdbCacheReallocSLastCol(pLRULastCol, &charge));
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, pLastKey, ROCKS_KEY_LEN, pLRULastCol, charge, tsdbCacheDeleter,
|
||||
|
@ -1184,7 +1170,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
if (cmp_res < 0 || (cmp_res == 0 && !COL_VAL_IS_NONE(pColVal))) {
|
||||
SLastCol newLastCol = {
|
||||
.rowKey = *pRowKey, .colVal = *pColVal, .dirty = 1, .cacheStatus = TSDB_LAST_CACHE_VALID};
|
||||
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, key, &newLastCol, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1263,7 +1249,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
SLastCol *pToFree = pLastCol;
|
||||
|
||||
if (pLastCol && pLastCol->cacheStatus == TSDB_LAST_CACHE_NO_CACHE) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0)) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
|
@ -1293,7 +1279,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
taosMemoryFreeClear(pToFree);
|
||||
break;
|
||||
}
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, &idxKey->key, &lastColTmp, 0)) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("tsdb/cache: vgId:%d, put lru failed at line %d since %s.", TD_VID(pTsdb->pVnode), lino,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
|
@ -1648,30 +1634,14 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
|
|||
continue;
|
||||
}
|
||||
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pTmpLastCol) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
|
||||
size_t charge = 0;
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(pLastCol);
|
||||
// store result back to rocks cache
|
||||
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
||||
tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
pLastCol = NULL;
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
code = tsdbCachePutToRocksdb(pTsdb, &idxKey->key, pLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s.", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
|
||||
TAOS_CHECK_EXIT(code);
|
||||
|
@ -1746,18 +1716,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
SLastCol *pToFree = pLastCol;
|
||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
||||
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
if (!pTmpLastCol) {
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
|
||||
size_t charge = 0;
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
code = tsdbCacheReallocSLastCol(pLastCol, &charge);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, &idxKey->key, pLastCol, 0);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
@ -1765,20 +1727,10 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
SLastCol lastCol = *pLastCol;
|
||||
code = tsdbCacheReallocSLastCol(&lastCol, NULL);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tsdbCacheFreeSLastColItem(pLastCol);
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter,
|
||||
tsdbCacheOverWriter, NULL, TAOS_LRU_PRIORITY_LOW, pTsdb);
|
||||
if (TAOS_LRU_STATUS_OK != status && TAOS_LRU_STATUS_OK_OVERWRITTEN != status) {
|
||||
tsdbError("vgId:%d, %s failed at line %d status %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__, status);
|
||||
taosMemoryFreeClear(pToFree);
|
||||
TAOS_CHECK_EXIT(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||
taosArrayRemove(remainCols, j);
|
||||
taosArrayRemove(ignoreFromRocks, j);
|
||||
|
@ -1966,8 +1918,9 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
|
||||
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
|
||||
.colVal = COL_VAL_NONE(cid, pTSchema->columns[i].type),
|
||||
.dirty = 1,
|
||||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol);
|
||||
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol, 1);
|
||||
}
|
||||
if (taosLRUCacheRelease(pTsdb->lruCache, h, false) != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
|
@ -2032,6 +1985,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||
SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
|
||||
.colVal = COL_VAL_NONE(pLastKey->cid, pTSchema->columns[idxKey->idx].type),
|
||||
.dirty = 0,
|
||||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||
|
||||
if ((code = tsdbCachePutToRocksdb(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2039,7 +1993,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
tsdbError("tsdb/cache/del: vgId:%d, put to rocks failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
}
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = tsdbCachePutToLRU(pTsdb, pLastKey, &noCacheCol, 0)) != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
tsdbError("tsdb/cache/del: vgId:%d, put to lru failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
goto _exit;
|
||||
|
|
Loading…
Reference in New Issue