remove void in wal lruCache and tsdbCache
This commit is contained in:
parent
e1c099dd3d
commit
8ba8210f64
|
@ -577,7 +577,10 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
|||
SLastCol *pLastCol = (SLastCol *)value;
|
||||
|
||||
if (pLastCol->dirty) {
|
||||
(void)tsdbCacheFlushDirty(key, klen, pLastCol, ud);
|
||||
if (tsdbCacheFlushDirty(key, klen, pLastCol, ud) != 0) {
|
||||
STsdb *pTsdb = (STsdb *)ud;
|
||||
tsdbError("tsdb/cache: vgId:%d, flush cache %s failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
for (uint8_t i = 0; i < pLastCol->rowKey.numOfPKs; ++i) {
|
||||
|
@ -719,14 +722,30 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
|||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
{
|
||||
SLastCol *pLastCol = NULL;
|
||||
(void)tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
|
||||
code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
if (pLastCol != NULL) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
}
|
||||
goto _exit;
|
||||
}
|
||||
if (NULL != pLastCol) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[0], klen);
|
||||
}
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
|
||||
pLastCol = NULL;
|
||||
(void)tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
|
||||
code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
if (pLastCol != NULL) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
}
|
||||
goto _exit;
|
||||
}
|
||||
if (NULL != pLastCol) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[1], klen);
|
||||
}
|
||||
|
@ -738,7 +757,10 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
|||
for (int i = 0; i < 2; i++) {
|
||||
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
|
||||
if (h) {
|
||||
(void)taosLRUCacheRelease(pTsdb->lruCache, h, true);
|
||||
if (taosLRUCacheRelease(pTsdb->lruCache, h, true)) {
|
||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
goto _exit;
|
||||
}
|
||||
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
|
||||
}
|
||||
}
|
||||
|
@ -765,8 +787,20 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
|
|||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
STSchema *pTSchema = NULL;
|
||||
|
@ -781,8 +815,20 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
|
|||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST_ROW);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, LFLAG_LAST);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
|
@ -798,7 +844,13 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
|
|||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
code = tsdbCacheCommitNoLock(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (pSchemaRow != NULL) {
|
||||
bool hasPrimayKey = false;
|
||||
|
@ -810,7 +862,13 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
|
|||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
STSchema *pTSchema = NULL;
|
||||
|
@ -830,7 +888,13 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
|
|||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
|
@ -848,7 +912,13 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
|
|||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
code = tsdbCacheCommitNoLock(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
STSchema *pTSchema = NULL;
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
|
||||
|
@ -871,7 +941,13 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
|
|||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -889,12 +965,22 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t
|
|||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
// rocksMayWrite(pTsdb, true, false, false);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
//(void)tsdbCacheCommit(pTsdb);
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
@ -904,9 +990,21 @@ int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool h
|
|||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
code = tsdbCacheCommitNoLock(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
||||
|
@ -923,14 +1021,24 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t
|
|||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||
tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
code = tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s new table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
// rocksMayWrite(pTsdb, true, false, false);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
//(void)tsdbCacheCommit(pTsdb);
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
@ -939,12 +1047,24 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool
|
|||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
code = tsdbCacheCommitNoLock(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s commit with no lock failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
code = tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s drop table column failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, false);
|
||||
|
@ -1109,7 +1229,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
}
|
||||
}
|
||||
|
||||
(void)taosLRUCacheRelease(pCache, h, false);
|
||||
code = taosLRUCacheRelease(pCache, h, false);
|
||||
TAOS_CHECK_EXIT(code);
|
||||
} else {
|
||||
if (!remainCols) {
|
||||
|
@ -1151,7 +1271,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
|
||||
&values_list_sizes);
|
||||
|
@ -1169,7 +1289,15 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
|
|||
SColVal *pColVal = &updCtx->colVal;
|
||||
|
||||
SLastCol *pLastCol = NULL;
|
||||
(void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
|
||||
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
if (pLastCol != NULL) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
}
|
||||
goto _exit;
|
||||
}
|
||||
/*
|
||||
if (code) {
|
||||
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
|
@ -1268,7 +1396,12 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
|||
tsdbRowGetKey(&lRow, &tsdbRowKey);
|
||||
|
||||
STSDBRowIter iter = {0};
|
||||
(void)tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||
code = tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
int32_t iCol = 0;
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
|
||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||
|
@ -1312,13 +1445,23 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
|||
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||
}
|
||||
(void)tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
|
||||
code = tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s tSimpleHashIterateRemove failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__,
|
||||
__LINE__, tstrerror(code));
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. do update
|
||||
(void)tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||
code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
@ -1384,7 +1527,12 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
|
|||
|
||||
// 2. prepare last row
|
||||
STSDBRowIter iter = {0};
|
||||
(void)tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||
code = tsdbRowIterOpen(&iter, &lRow, pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
|
||||
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
|
||||
if (!taosArrayPush(ctxArray, &updateCtx)) {
|
||||
|
@ -1394,7 +1542,12 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
|
|||
tsdbRowClose(&iter);
|
||||
|
||||
// 3. do update
|
||||
(void)tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||
code = tsdbCacheUpdate(pTsdb, suid, uid, ctxArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s tsdbCacheUpdate failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
@ -1604,7 +1757,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
code = tsdbCacheGetValuesFromRocks(pTsdb, num_keys, (const char *const *)keys_list, keys_list_sizes, &values_list,
|
||||
&values_list_sizes);
|
||||
|
@ -1624,7 +1777,15 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
|
|||
continue;
|
||||
}
|
||||
|
||||
(void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
|
||||
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
if (pLastCol != NULL) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
}
|
||||
goto _exit;
|
||||
}
|
||||
SLastCol *pToFree = pLastCol;
|
||||
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
|
||||
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
|
||||
|
@ -1757,7 +1918,11 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
|||
}
|
||||
|
||||
if (h) {
|
||||
(void)taosLRUCacheRelease(pCache, h, false);
|
||||
code = taosLRUCacheRelease(pCache, h, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1786,7 +1951,11 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
|||
}
|
||||
|
||||
if (h) {
|
||||
(void)taosLRUCacheRelease(pCache, h, false);
|
||||
code = taosLRUCacheRelease(pCache, h, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1820,7 +1989,13 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
int numKeys = 0;
|
||||
SArray *remainCols = NULL;
|
||||
|
||||
(void)tsdbCacheCommit(pTsdb);
|
||||
code = tsdbCacheCommit(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s commit failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
(void)taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
(void)taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
|
@ -1837,7 +2012,11 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
.cacheStatus = TSDB_LAST_CACHE_NO_CACHE};
|
||||
code = tsdbCachePutToLRU(pTsdb, &lastKey, &noneCol);
|
||||
}
|
||||
(void)taosLRUCacheRelease(pTsdb->lruCache, h, false);
|
||||
code = taosLRUCacheRelease(pTsdb->lruCache, h, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s release lru cache failed at line %d.", TD_VID(pTsdb->pVnode), __func__, __LINE__);
|
||||
goto _exit;
|
||||
}
|
||||
TAOS_CHECK_EXIT(code);
|
||||
} else {
|
||||
if (!remainCols) {
|
||||
|
@ -1871,7 +2050,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
SIdxKey* idxKey = taosArrayGet(remainCols, i);
|
||||
SIdxKey *idxKey = taosArrayGet(remainCols, i);
|
||||
|
||||
((SLastKey *)key)[0] = idxKey->key;
|
||||
|
||||
|
@ -1879,7 +2058,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
keys_list_sizes[i] = klen;
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
rocksMayWrite(pTsdb, true); // flush writebatch cache
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbCacheGetValuesFromRocks(pTsdb, numKeys, (const char *const *)keys_list, keys_list_sizes,
|
||||
&values_list, &values_list_sizes),
|
||||
|
@ -1888,8 +2067,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
|||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
for (int i = 0; i < numKeys; ++i) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
(void)tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
|
||||
SIdxKey* idxKey = taosArrayGet(remainCols, i);
|
||||
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
if (pLastCol != NULL) {
|
||||
taosMemoryFreeClear(pLastCol);
|
||||
}
|
||||
goto _exit;
|
||||
}
|
||||
SIdxKey *idxKey = taosArrayGet(remainCols, i);
|
||||
SLastKey *pLastKey = &idxKey->key;
|
||||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||
SLastCol noCacheCol = {.rowKey.ts = TSKEY_MIN,
|
||||
|
@ -2389,7 +2576,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
|
||||
state->pr->pCurFileSet = state->pFileSet;
|
||||
|
||||
(void)loadDataTomb(state->pr, state->pr->pFileReader);
|
||||
code = loadDataTomb(state->pr, state->pr->pFileReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s load tomb failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_CHECK_GOTO(code, &lino, _err);
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbDataFileReadBrinBlk(state->pr->pFileReader, &state->pr->pBlkArray), &lino, _err);
|
||||
}
|
||||
|
@ -2467,7 +2659,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
|
||||
if (!state->pLastRow) {
|
||||
if (state->pLastIter) {
|
||||
(void)lastIterClose(&state->pLastIter);
|
||||
code = lastIterClose(&state->pLastIter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
clearLastFileSet(state);
|
||||
|
@ -2575,7 +2772,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
|
||||
if (!state->pLastRow) {
|
||||
if (state->pLastIter) {
|
||||
(void)lastIterClose(&state->pLastIter);
|
||||
code = lastIterClose(&state->pLastIter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s close last iter failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
*ppRow = &state->row;
|
||||
|
@ -2599,7 +2801,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
} else {
|
||||
// TODO: merge rows and *ppRow = mergedRow
|
||||
SRowMerger *pMerger = &state->rowMerger;
|
||||
(void)tsdbRowMergerInit(pMerger, state->pTSchema);
|
||||
code = tsdbRowMergerInit(pMerger, state->pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("vgId:%d, %s init row merger failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
|
||||
tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema), &lino, _err);
|
||||
TAOS_CHECK_GOTO(tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema), &lino, _err);
|
||||
|
@ -2765,7 +2972,11 @@ int32_t clearNextRowFromFS(void *iter) {
|
|||
}
|
||||
|
||||
if (state->pLastIter) {
|
||||
(void)lastIterClose(&state->pLastIter);
|
||||
code = lastIterClose(&state->pLastIter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
if (state->pBlockData) {
|
||||
|
@ -2798,7 +3009,11 @@ int32_t clearNextRowFromFS(void *iter) {
|
|||
|
||||
static void clearLastFileSet(SFSNextRowIter *state) {
|
||||
if (state->pLastIter) {
|
||||
(void)lastIterClose(&state->pLastIter);
|
||||
int code = lastIterClose(&state->pLastIter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%s close last iter failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (state->pBlockData) {
|
||||
|
@ -3363,7 +3578,11 @@ _err:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { (void)taosLRUCacheRelease(pCache, h, false); }
|
||||
void tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
||||
if (taosLRUCacheRelease(pCache, h, false)) {
|
||||
tsdbError("%s release lru cache failed at line %d.", __func__, __LINE__);
|
||||
}
|
||||
}
|
||||
|
||||
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
|
||||
taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
|
||||
|
|
|
@ -57,7 +57,10 @@ static FORCE_INLINE int32_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx, in
|
|||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
|
||||
int64_t fileSize = 0;
|
||||
(void)taosStatFile(fnameStr, &fileSize, NULL, NULL);
|
||||
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
|
||||
code = terrno;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
|
||||
if (pFile == NULL) {
|
||||
|
@ -356,6 +359,7 @@ static int32_t walLogEntriesComplete(const SWal* pWal) {
|
|||
}
|
||||
|
||||
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
||||
int32_t code = 0;
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
if (!pFileInfo) {
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
|
@ -365,7 +369,10 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
|
||||
int64_t fileSize = 0;
|
||||
(void)taosStatFile(fnameStr, &fileSize, NULL, NULL);
|
||||
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
|
||||
int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
|
||||
|
||||
|
@ -381,7 +388,10 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
|
|||
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
|
||||
lastEndOffset);
|
||||
|
||||
(void)taosFtruncateFile(pFile, lastEndOffset);
|
||||
code = taosFtruncateFile(pFile, lastEndOffset);
|
||||
if (code < 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)taosCloseFile(&pFile);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
|
@ -395,8 +405,14 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
|
|||
regex_t logRegPattern;
|
||||
regex_t idxRegPattern;
|
||||
|
||||
(void)regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
||||
(void)regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
||||
if (regcomp(&logRegPattern, logPattern, REG_EXTENDED) != 0) {
|
||||
wError("failed to compile log pattern, error:%s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
if (regcomp(&idxRegPattern, idxPattern, REG_EXTENDED) != 0) {
|
||||
wError("failed to compile idx pattern");
|
||||
return terrno;
|
||||
}
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(pWal->path);
|
||||
if (pDir == NULL) {
|
||||
|
@ -420,14 +436,22 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
|
|||
if (!taosArrayPush(actualLog, &fileInfo)) {
|
||||
regfree(&logRegPattern);
|
||||
regfree(&idxRegPattern);
|
||||
(void)taosCloseDir(&pDir);
|
||||
int32_t ret = taosCloseDir(&pDir);
|
||||
if (ret != 0) {
|
||||
wError("failed to close dir, ret:%s", tstrerror(ret));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(void)taosCloseDir(&pDir);
|
||||
int32_t ret = taosCloseDir(&pDir);
|
||||
if (ret != 0) {
|
||||
wError("failed to close dir, ret:%s", tstrerror(ret));
|
||||
return terrno;
|
||||
}
|
||||
regfree(&logRegPattern);
|
||||
regfree(&idxRegPattern);
|
||||
|
||||
|
@ -684,7 +708,9 @@ _err:
|
|||
int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
|
||||
int64_t ver = -1;
|
||||
int64_t totSize = 0;
|
||||
(void)taosThreadRwlockRdlock(&pWal->mutex);
|
||||
if (taosThreadRwlockRdlock(&pWal->mutex) != 0) {
|
||||
wError("vgId:%d failed to lock %p", pWal->cfg.vgId, &pWal->mutex);
|
||||
}
|
||||
int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet);
|
||||
while (--fileIdx) {
|
||||
SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
|
@ -694,7 +720,9 @@ int64_t walGetVerRetention(SWal* pWal, int64_t bytes) {
|
|||
}
|
||||
totSize += pInfo->fileSize;
|
||||
}
|
||||
(void)taosThreadRwlockUnlock(&pWal->mutex);
|
||||
if (taosThreadRwlockUnlock(&pWal->mutex) != 0) {
|
||||
wError("vgId:%d failed to lock %p", pWal->cfg.vgId, &pWal->mutex);
|
||||
}
|
||||
return ver + 1;
|
||||
}
|
||||
|
||||
|
@ -765,21 +793,35 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
|
|||
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
(void)cJSON_AddItemToObject(pRoot, "meta", pMeta);
|
||||
if (cJSON_AddItemToObject(pRoot, "meta", pMeta) != 0) {
|
||||
wError("vgId:%d, failed to add meta to root", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pWal->vers.firstVer);
|
||||
(void)cJSON_AddStringToObject(pMeta, "firstVer", buf);
|
||||
if (cJSON_AddStringToObject(pMeta, "firstVer", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add firstVer to meta", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pWal->vers.snapshotVer);
|
||||
(void)cJSON_AddStringToObject(pMeta, "snapshotVer", buf);
|
||||
if (cJSON_AddStringToObject(pMeta, "snapshotVer", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add snapshotVer to meta", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pWal->vers.commitVer);
|
||||
(void)cJSON_AddStringToObject(pMeta, "commitVer", buf);
|
||||
if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add commitVer to meta", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pWal->vers.lastVer);
|
||||
(void)cJSON_AddStringToObject(pMeta, "lastVer", buf);
|
||||
if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add lastVer to meta", pWal->cfg.vgId);
|
||||
}
|
||||
|
||||
(void)cJSON_AddItemToObject(pRoot, "files", pFiles);
|
||||
if (cJSON_AddItemToObject(pRoot, "files", pFiles) != 0) {
|
||||
wError("vgId:%d, failed to add files to root", pWal->cfg.vgId);
|
||||
}
|
||||
SWalFileInfo* pData = pWal->fileInfoSet->pData;
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SWalFileInfo* pInfo = &pData[i];
|
||||
(void)cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject());
|
||||
if (cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()) != 0) {
|
||||
wError("vgId:%d, failed to add field to files", pWal->cfg.vgId);
|
||||
}
|
||||
if (pField == NULL) {
|
||||
cJSON_Delete(pRoot);
|
||||
|
||||
|
@ -788,15 +830,25 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) {
|
|||
// cjson only support int32_t or double
|
||||
// string are used to prohibit the loss of precision
|
||||
(void)sprintf(buf, "%" PRId64, pInfo->firstVer);
|
||||
(void)cJSON_AddStringToObject(pField, "firstVer", buf);
|
||||
if (cJSON_AddStringToObject(pField, "firstVer", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add firstVer to field", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pInfo->lastVer);
|
||||
(void)cJSON_AddStringToObject(pField, "lastVer", buf);
|
||||
if (cJSON_AddStringToObject(pField, "lastVer", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add lastVer to field", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pInfo->createTs);
|
||||
(void)cJSON_AddStringToObject(pField, "createTs", buf);
|
||||
if (cJSON_AddStringToObject(pField, "createTs", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add createTs to field", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pInfo->closeTs);
|
||||
(void)cJSON_AddStringToObject(pField, "closeTs", buf);
|
||||
if (cJSON_AddStringToObject(pField, "closeTs", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add closeTs to field", pWal->cfg.vgId);
|
||||
}
|
||||
(void)sprintf(buf, "%" PRId64, pInfo->fileSize);
|
||||
(void)cJSON_AddStringToObject(pField, "fileSize", buf);
|
||||
if (cJSON_AddStringToObject(pField, "fileSize", buf) == NULL) {
|
||||
wError("vgId:%d, failed to add fileSize to field", pWal->cfg.vgId);
|
||||
}
|
||||
}
|
||||
char* pSerialized = cJSON_Print(pRoot);
|
||||
cJSON_Delete(pRoot);
|
||||
|
@ -874,7 +926,10 @@ _err:
|
|||
static int walFindCurMetaVer(SWal* pWal) {
|
||||
const char* pattern = "^meta-ver[0-9]+$";
|
||||
regex_t walMetaRegexPattern;
|
||||
(void)regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED);
|
||||
if (regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED) != 0) {
|
||||
wError("failed to compile wal meta pattern, error %s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(pWal->path);
|
||||
if (pDir == NULL) {
|
||||
|
@ -896,7 +951,10 @@ static int walFindCurMetaVer(SWal* pWal) {
|
|||
}
|
||||
wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name);
|
||||
}
|
||||
(void)taosCloseDir(&pDir);
|
||||
if (taosCloseDir(&pDir) != 0) {
|
||||
wError("failed to close dir, ret:%s", tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
regfree(&walMetaRegexPattern);
|
||||
return metaVer;
|
||||
}
|
||||
|
@ -979,8 +1037,16 @@ int32_t walSaveMeta(SWal* pWal) {
|
|||
|
||||
// delete old file
|
||||
if (metaVer > -1) {
|
||||
(void)walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
(void)taosRemoveFile(fnameStr);
|
||||
n = walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
if (n >= sizeof(fnameStr)) {
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
code = taosRemoveFile(fnameStr);
|
||||
if (code) {
|
||||
wError("vgId:%d, failed to remove file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
} else {
|
||||
wInfo("vgId:%d, remove old meta file: %s", pWal->cfg.vgId, fnameStr);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
|
@ -994,6 +1060,7 @@ _err:
|
|||
|
||||
int32_t walLoadMeta(SWal* pWal) {
|
||||
int32_t code = 0;
|
||||
int n = 0;
|
||||
// find existing meta file
|
||||
int metaVer = walFindCurMetaVer(pWal);
|
||||
if (metaVer == -1) {
|
||||
|
@ -1002,12 +1069,23 @@ int32_t walLoadMeta(SWal* pWal) {
|
|||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
(void)walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
n = walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
if (n >= sizeof(fnameStr)) {
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
// read metafile
|
||||
int64_t fileSize = 0;
|
||||
(void)taosStatFile(fnameStr, &fileSize, NULL, NULL);
|
||||
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (fileSize == 0) {
|
||||
(void)taosRemoveFile(fnameStr);
|
||||
code = taosRemoveFile(fnameStr);
|
||||
if (code) {
|
||||
wError("vgId:%d, failed to remove file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
} else {
|
||||
wInfo("vgId:%d, remove old meta file: %s", pWal->cfg.vgId, fnameStr);
|
||||
}
|
||||
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
|
@ -1046,6 +1124,9 @@ int32_t walRemoveMeta(SWal* pWal) {
|
|||
int metaVer = walFindCurMetaVer(pWal);
|
||||
if (metaVer == -1) return 0;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
(void)walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
int n = walBuildMetaName(pWal, metaVer, fnameStr);
|
||||
if (n >= sizeof(fnameStr)) {
|
||||
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||
}
|
||||
return taosRemoveFile(fnameStr);
|
||||
}
|
||||
|
|
|
@ -160,7 +160,10 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
|||
pWal->writeHead.magic = WAL_MAGIC;
|
||||
|
||||
// load meta
|
||||
(void)walLoadMeta(pWal);
|
||||
if (walLoadMeta(pWal) < 0) {
|
||||
wError("vgId:%d, failed to load meta since %s", pWal->cfg.vgId, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (walCheckAndRepairMeta(pWal) < 0) {
|
||||
wError("vgId:%d, cannot open wal since repair meta file failed", pWal->cfg.vgId);
|
||||
|
@ -233,7 +236,9 @@ int32_t walPersist(SWal *pWal) {
|
|||
|
||||
void walClose(SWal *pWal) {
|
||||
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
|
||||
(void)walSaveMeta(pWal);
|
||||
if (walSaveMeta(pWal) < 0) {
|
||||
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, tstrerror(terrno));
|
||||
}
|
||||
TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
|
||||
pWal->pLogFile = NULL;
|
||||
(void)taosCloseFile(&pWal->pIdxFile);
|
||||
|
@ -257,10 +262,14 @@ void walClose(SWal *pWal) {
|
|||
if (pWal->cfg.level == TAOS_WAL_SKIP) {
|
||||
wInfo("vgId:%d, remove all wals, path:%s", pWal->cfg.vgId, pWal->path);
|
||||
taosRemoveDir(pWal->path);
|
||||
(void)taosMkDir(pWal->path);
|
||||
if (taosMkDir(pWal->path) != 0) {
|
||||
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno));
|
||||
}
|
||||
}
|
||||
|
||||
(void)taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||
if (taosRemoveRef(tsWal.refSetId, pWal->refId) < 0) {
|
||||
wError("vgId:%d, failed to remove ref for Wal since %s", pWal->cfg.vgId, tstrerror(terrno));
|
||||
}
|
||||
}
|
||||
|
||||
static void walFreeObj(void *wal) {
|
||||
|
@ -285,7 +294,9 @@ static bool walNeedFsync(SWal *pWal) {
|
|||
|
||||
static void walUpdateSeq() {
|
||||
taosMsleep(WAL_REFRESH_MS);
|
||||
(void)atomic_add_fetch_32((volatile int32_t *)&tsWal.seq, 1);
|
||||
if (atomic_add_fetch_32((volatile int32_t *)&tsWal.seq, 1) < 0) {
|
||||
wError("failed to update wal seq since %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
static void walFsyncAll() {
|
||||
|
|
|
@ -52,8 +52,11 @@ void walCloseRef(SWal *pWal, int64_t refId) {
|
|||
} else {
|
||||
wDebug("vgId:%d, wal close ref null, refId %" PRId64, pWal->cfg.vgId, refId);
|
||||
}
|
||||
|
||||
(void)taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||
int32_t code = 0;
|
||||
code = taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||
if (code) {
|
||||
wError("vgId:%d, wal remove ref failed, refId %" PRId64 ", error:%s", pWal->cfg.vgId, refId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -14,12 +14,12 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tlrucache.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tarray.h"
|
||||
#include "tdef.h"
|
||||
#include "tlog.h"
|
||||
#include "tlrucache.h"
|
||||
#include "tutil.h"
|
||||
|
||||
typedef struct SLRUEntry SLRUEntry;
|
||||
|
@ -305,7 +305,15 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr
|
|||
SLRUEntry *old = shard->lru.next;
|
||||
|
||||
taosLRUCacheShardLRURemove(shard, old);
|
||||
(void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
if (e != NULL) {
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
||||
taosLRUCacheShardLRURemove(shard, e);
|
||||
|
||||
shard->usage -= e->totalCharge;
|
||||
}
|
||||
}
|
||||
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||
shard->usage -= old->totalCharge;
|
||||
|
@ -529,7 +537,14 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
|
|||
while (shard->lru.next != &shard->lru) {
|
||||
SLRUEntry *old = shard->lru.next;
|
||||
taosLRUCacheShardLRURemove(shard, old);
|
||||
(void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash);
|
||||
if (e != NULL) {
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
||||
taosLRUCacheShardLRURemove(shard, e);
|
||||
shard->usage -= e->totalCharge;
|
||||
}
|
||||
}
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
|
||||
shard->usage -= old->totalCharge;
|
||||
|
||||
|
@ -574,7 +589,14 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
|
|||
lastReference = taosLRUEntryUnref(e);
|
||||
if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) {
|
||||
if (shard->usage > shard->capacity || eraseIfLastRef) {
|
||||
(void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
|
||||
SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash);
|
||||
if (e != NULL) {
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
|
||||
taosLRUCacheShardLRURemove(shard, e);
|
||||
shard->usage -= e->totalCharge;
|
||||
}
|
||||
}
|
||||
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
|
||||
} else {
|
||||
taosLRUCacheShardLRUInsert(shard, e);
|
||||
|
|
Loading…
Reference in New Issue