tsdb/cache: invalidate cached schema

This commit is contained in:
Minglei Jin 2024-10-29 14:41:14 +08:00
parent cf3e84e79d
commit e364d5aa66
3 changed files with 19 additions and 2 deletions

View File

@ -222,6 +222,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);

View File

@ -620,6 +620,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
} }
} }
if (uids) taosArrayDestroy(uids); if (uids) taosArrayDestroy(uids);
tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1);
} }
metaWLock(pMeta); metaWLock(pMeta);
@ -1945,6 +1947,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
break; break;
} }
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid);
}
entry.version = version; entry.version = version;
// do actual write // do actual write

View File

@ -493,7 +493,7 @@ int tsdbCacheFlushDirty(const void *key, size_t klen, void *value, void *ud) {
int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
char *err = NULL; char *err = NULL;
/*
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
@ -511,7 +511,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
rocksdb_free(err); rocksdb_free(err);
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
} }
*/
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -1304,6 +1304,16 @@ _exit:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) {
if (suid) {
if (pTsdb->rCache.suid == suid) {
pTsdb->rCache.suid = -1;
}
} else if (pTsdb->rCache.uid == uid) {
pTsdb->rCache.uid = -1;
}
}
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) {
if (suid) { if (suid) {
if (pTsdb->rCache.suid == suid) { if (pTsdb->rCache.suid == suid) {