fix schema update with sver param

This commit is contained in:
Minglei Jin 2024-10-30 10:22:19 +08:00
parent 6b67c2ebf0
commit d8df6db06a
5 changed files with 55 additions and 48 deletions

View File

@ -343,6 +343,7 @@ typedef struct {
rocksdb_readoptions_t *readoptions; rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t *writebatch;
TdThreadMutex writeBatchMutex; TdThreadMutex writeBatchMutex;
int32_t sver;
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
STSchema *pTSchema; STSchema *pTSchema;

View File

@ -222,7 +222,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey); int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid); void tsdbCacheInvalidateSchema(STsdb* pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);

View File

@ -621,7 +621,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
} }
if (uids) taosArrayDestroy(uids); if (uids) taosArrayDestroy(uids);
tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1); tsdbCacheInvalidateSchema(pTsdb, pReq->suid, -1, pReq->schemaRow.version);
} }
metaWLock(pMeta); metaWLock(pMeta);
@ -1948,7 +1948,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
} }
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid); tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version);
} }
entry.version = version; entry.version = version;

View File

@ -1136,8 +1136,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
(void)taosThreadMutexLock(&pTsdb->lruMutex); (void)taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); SLastUpdateCtx *updCtx = &((SLastUpdateCtx *)TARRAY_DATA(updCtxArray))[i];
int8_t lflag = updCtx->lflag; int8_t lflag = updCtx->lflag;
SRowKey *pRowKey = &updCtx->tsdbRowKey.key; SRowKey *pRowKey = &updCtx->tsdbRowKey.key;
SColVal *pColVal = &updCtx->colVal; SColVal *pColVal = &updCtx->colVal;
@ -1147,8 +1146,7 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
} }
SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid}; SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid};
size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) { if (pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
@ -1304,28 +1302,30 @@ _exit:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { void tsdbCacheInvalidateSchema(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
if (suid) { if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) {
if (pTsdb->rCache.suid == suid) { if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) {
pTsdb->rCache.sver = -1;
pTsdb->rCache.suid = -1; pTsdb->rCache.suid = -1;
} } else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) {
} else if (pTsdb->rCache.uid == uid) { pTsdb->rCache.sver = -1;
pTsdb->rCache.uid = -1; pTsdb->rCache.uid = -1;
} }
}
} }
static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid) { static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t sver) {
if (suid) { if (pTsdb->rCache.pTSchema && pTsdb->rCache.suid == suid) {
if (pTsdb->rCache.suid == suid) { if (pTsdb->rCache.suid && sver == pTsdb->rCache.sver) {
pTsdb->rCache.uid = uid; return 0;
} else if (pTsdb->rCache.uid == uid && pTsdb->rCache.sver == sver) {
return 0; return 0;
} }
} else if (pTsdb->rCache.uid == uid) {
return 0;
} }
pTsdb->rCache.suid = suid; pTsdb->rCache.suid = suid;
pTsdb->rCache.uid = uid; pTsdb->rCache.uid = uid;
pTsdb->rCache.sver = sver;
tDestroyTSchema(pTsdb->rCache.pTSchema); tDestroyTSchema(pTsdb->rCache.pTSchema);
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema); return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTsdb->rCache.pTSchema);
} }
@ -1336,35 +1336,26 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
// 1. prepare last // 1. prepare last
TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version};
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
int32_t sver = TSDBROW_SVERSION(&lRow); int32_t sver = TSDBROW_SVERSION(&lRow);
SArray *ctxArray = NULL; SArray *ctxArray = NULL;
SSHashObj *iColHash = NULL; SSHashObj *iColHash = NULL;
TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid), &lino, _exit); TAOS_CHECK_GOTO(tsdbUpdateSkm(pTsdb, suid, uid, sver), &lino, _exit);
pTSchema = pTsdb->rCache.pTSchema; pTSchema = pTsdb->rCache.pTSchema;
/*
TAOS_CHECK_GOTO(metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema), &lino, _exit);
*/
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
int32_t nCol = pTSchema->numOfCols; int32_t nCol = pTSchema->numOfCols;
ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx));
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
// 1. prepare by lrow // 1. prepare by lrow
STsdbRowKey tsdbRowKey = {0}; STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&lRow, &tsdbRowKey); tsdbRowGetKey(&lRow, &tsdbRowKey);
STSDBRowIter iter = {0}; STSDBRowIter iter = {0};
code = tsdbRowIterOpen(&iter, &lRow, pTSchema); TAOS_CHECK_GOTO(tsdbRowIterOpen(&iter, &lRow, pTSchema), &lino, _exit);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s tsdbRowIterOpen failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
TAOS_CHECK_GOTO(code, &lino, _exit);
}
int32_t iCol = 0; int32_t iCol = 0;
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) {
SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal};
@ -1372,16 +1363,20 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
TAOS_CHECK_GOTO(terrno, &lino, _exit); TAOS_CHECK_GOTO(terrno, &lino, _exit);
} }
if (!COL_VAL_IS_VALUE(pColVal)) { if (COL_VAL_IS_VALUE(pColVal)) {
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
continue;
}
updateCtx.lflag = LFLAG_LAST; updateCtx.lflag = LFLAG_LAST;
if (!taosArrayPush(ctxArray, &updateCtx)) { if (!taosArrayPush(ctxArray, &updateCtx)) {
TAOS_CHECK_GOTO(terrno, &lino, _exit); TAOS_CHECK_GOTO(terrno, &lino, _exit);
} }
} else {
if (!iColHash) {
iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
}
if (tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0)) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
}
}
} }
tsdbRowClose(&iter); tsdbRowClose(&iter);
@ -1425,7 +1420,10 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
} }
_exit: _exit:
// taosMemoryFreeClear(pTSchema); if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
taosArrayDestroy(ctxArray); taosArrayDestroy(ctxArray);
tSimpleHashCleanup(iColHash); tSimpleHashCleanup(iColHash);

View File

@ -14,12 +14,12 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tlrucache.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tarray.h" #include "tarray.h"
#include "tdef.h" #include "tdef.h"
#include "tlog.h" #include "tlog.h"
#include "tlrucache.h"
#include "tutil.h" #include "tutil.h"
typedef struct SLRUEntry SLRUEntry; typedef struct SLRUEntry SLRUEntry;
@ -110,7 +110,7 @@ struct SLRUEntryTable {
}; };
static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
table->lengthBits = 4; table->lengthBits = 16;
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
if (!table->list) { if (!table->list) {
TAOS_RETURN(terrno); TAOS_RETURN(terrno);
@ -168,7 +168,14 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *k
while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) {
entry = &(*entry)->nextHash; entry = &(*entry)->nextHash;
} }
/*
SLRUEntry *pentry = table->list[hash >> (32 - table->lengthBits)];
SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)];
while (pentry && (pentry->hash != hash || memcmp(key, pentry->keyData, keyLen) != 0)) {
entry = &pentry->nextHash;
pentry = *entry;
}
*/
return entry; return entry;
} }
@ -744,7 +751,8 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
} }
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle, LRUPriority priority, void *ud) { _taos_lru_deleter_t deleter, _taos_lru_overwriter_t overwriter, LRUHandle **handle,
LRUPriority priority, void *ud) {
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
uint32_t shardIndex = hash & cache->shardedCache.shardMask; uint32_t shardIndex = hash & cache->shardedCache.shardMask;