optimize
This commit is contained in:
parent
55c3801ca1
commit
5d1cbb54c1
|
@ -24,6 +24,7 @@ target_sources(
|
|||
"src/meta/metaCommit.c"
|
||||
"src/meta/metaEntry.c"
|
||||
"src/meta/metaSnapshot.c"
|
||||
"src/meta/metaCache.c"
|
||||
|
||||
# sma
|
||||
"src/sma/smaEnv.c"
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
|
||||
typedef struct SMetaIdx SMetaIdx;
|
||||
typedef struct SMetaDB SMetaDB;
|
||||
typedef struct SMetaCache SMetaCache;
|
||||
|
||||
// metaDebug ==================
|
||||
// clang-format off
|
||||
|
@ -60,6 +61,13 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
|
|||
// metaTable ==================
|
||||
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
|
||||
|
||||
// metaCache ==================
|
||||
int32_t metaCacheOpen(SMeta* pMeta);
|
||||
void metaCacheClose(SMeta* pMeta);
|
||||
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
|
||||
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
|
||||
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||
|
||||
struct SMeta {
|
||||
TdThreadRwlock lock;
|
||||
|
||||
|
@ -84,6 +92,8 @@ struct SMeta {
|
|||
TTB* pStreamDb;
|
||||
|
||||
SMetaIdx* pIdx;
|
||||
|
||||
SMetaCache* pCache;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
@ -92,6 +102,12 @@ typedef struct {
|
|||
} STbDbKey;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
typedef struct {
|
||||
tb_uid_t suid;
|
||||
int64_t version;
|
||||
int32_t skmVer;
|
||||
} SUidIdxVal;
|
||||
|
||||
typedef struct {
|
||||
tb_uid_t uid;
|
||||
int32_t sver;
|
||||
|
|
|
@ -130,6 +130,14 @@ int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
|||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||
|
||||
typedef struct SMetaInfo {
|
||||
int64_t uid;
|
||||
int64_t suid;
|
||||
int64_t version;
|
||||
int32_t skmVer;
|
||||
} SMetaInfo;
|
||||
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||
|
||||
// tsdb
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||
int tsdbClose(STsdb** pTsdb);
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "meta.h"
|
||||
|
||||
#define META_CACHE_BASE_BUCKET 1024
|
||||
|
||||
// (uid , suid) : child table
|
||||
// (uid, 0) : normal table
|
||||
// (suid, suid) : super table
|
||||
typedef struct SMetaCacheEntry SMetaCacheEntry;
|
||||
struct SMetaCacheEntry {
|
||||
SMetaCacheEntry* next;
|
||||
SMetaInfo info;
|
||||
};
|
||||
|
||||
struct SMetaCache {
|
||||
int32_t nEntry;
|
||||
int32_t nBucket;
|
||||
SMetaCacheEntry** aBucket;
|
||||
};
|
||||
|
||||
int32_t metaCacheOpen(SMeta* pMeta) {
|
||||
int32_t code = 0;
|
||||
SMetaCache* pCache = NULL;
|
||||
|
||||
pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
|
||||
if (pCache == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pCache->nEntry = 0;
|
||||
pCache->nBucket = META_CACHE_BASE_BUCKET;
|
||||
pCache->aBucket = (SMetaCacheEntry**)taosMemoryCalloc(pCache->nBucket, sizeof(SMetaCacheEntry*));
|
||||
if (pCache->aBucket == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pCache);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->pCache = pCache;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
void metaCacheClose(SMeta* pMeta) {
|
||||
if (pMeta->pCache) {
|
||||
for (int32_t iBucket = 0; iBucket < pMeta->pCache->nBucket; iBucket++) {
|
||||
SMetaCacheEntry* pEntry = pMeta->pCache->aBucket[iBucket];
|
||||
while (pEntry) {
|
||||
SMetaCacheEntry* tEntry = pEntry->next;
|
||||
taosMemoryFree(pEntry);
|
||||
pEntry = tEntry;
|
||||
}
|
||||
}
|
||||
taosMemoryFree(pMeta->pCache->aBucket);
|
||||
taosMemoryFree(pMeta->pCache);
|
||||
pMeta->pCache = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
|
||||
int32_t code = 0;
|
||||
int32_t nBucket;
|
||||
|
||||
if (expand) {
|
||||
nBucket = pCache->nBucket * 2;
|
||||
} else {
|
||||
nBucket = pCache->nBucket / 2;
|
||||
}
|
||||
|
||||
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
|
||||
if (aBucket == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// rehash
|
||||
for (int32_t iBucket = 0; iBucket < pCache->nBucket; iBucket++) {
|
||||
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket];
|
||||
|
||||
while (pEntry) {
|
||||
SMetaCacheEntry* pTEntry = pEntry->next;
|
||||
|
||||
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
|
||||
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
|
||||
|
||||
pEntry = pTEntry;
|
||||
}
|
||||
}
|
||||
|
||||
// final set
|
||||
taosMemoryFree(pCache->aBucket);
|
||||
pCache->nBucket = nBucket;
|
||||
pCache->aBucket = aBucket;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
// ASSERT(metaIsWLocked(pMeta));
|
||||
|
||||
// search
|
||||
SMetaCache* pCache = pMeta->pCache;
|
||||
int32_t iBucket = TABS(pInfo->uid) % pCache->nBucket;
|
||||
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket];
|
||||
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
|
||||
ppEntry = &(*ppEntry)->next;
|
||||
}
|
||||
|
||||
if (*ppEntry) { // update
|
||||
ASSERT(pInfo->suid == (*ppEntry)->info.suid);
|
||||
if (pInfo->version > (*ppEntry)->info.version) {
|
||||
(*ppEntry)->info.version = pInfo->version;
|
||||
(*ppEntry)->info.skmVer = pInfo->skmVer;
|
||||
}
|
||||
} else { // insert
|
||||
if (pCache->nEntry >= pCache->nBucket) {
|
||||
code = metaRehashCache(pCache, 1);
|
||||
if (code) goto _exit;
|
||||
|
||||
iBucket = TABS(pInfo->uid) % pCache->nBucket;
|
||||
}
|
||||
|
||||
SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
|
||||
if (pEntryNew == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pEntryNew->info = *pInfo;
|
||||
pEntryNew->next = pCache->aBucket[iBucket];
|
||||
pCache->aBucket[iBucket] = pEntryNew;
|
||||
pCache->nEntry++;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
|
||||
int32_t code = 0;
|
||||
|
||||
SMetaCache* pCache = pMeta->pCache;
|
||||
int32_t iBucket = TABS(uid) % pCache->nBucket;
|
||||
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket];
|
||||
while (*ppEntry && (*ppEntry)->info.uid != uid) {
|
||||
ppEntry = &(*ppEntry)->next;
|
||||
}
|
||||
|
||||
SMetaCacheEntry* pEntry = *ppEntry;
|
||||
if (pEntry) {
|
||||
*ppEntry = pEntry->next;
|
||||
taosMemoryFree(pEntry);
|
||||
pCache->nEntry--;
|
||||
if (pCache->nEntry < pCache->nBucket / 4 && pCache->nBucket > META_CACHE_BASE_BUCKET) {
|
||||
code = metaRehashCache(pCache, 0);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
} else {
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
SMetaCache* pCache = pMeta->pCache;
|
||||
int32_t iBucket = TABS(uid) % pCache->nBucket;
|
||||
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket];
|
||||
|
||||
while (pEntry && pEntry->info.uid != uid) {
|
||||
pEntry = pEntry->next;
|
||||
}
|
||||
|
||||
if (pEntry) {
|
||||
*pInfo = pEntry->info;
|
||||
} else {
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
|
@ -73,7 +73,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
}
|
||||
|
||||
// open pUidIdx
|
||||
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
|
||||
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
@ -143,6 +143,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
int32_t code = metaCacheOpen(pMeta);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
metaError("vgId:%d, failed to open meta cache since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
|
||||
|
||||
*ppMeta = pMeta;
|
||||
|
@ -169,6 +176,7 @@ _err:
|
|||
|
||||
int metaClose(SMeta *pMeta) {
|
||||
if (pMeta) {
|
||||
if (pMeta->pCache) metaCacheClose(pMeta);
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
|
|
|
@ -63,7 +63,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
version = *(int64_t *)pReader->pBuf;
|
||||
version = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
||||
return metaGetTableEntryByVersion(pReader, version, uid);
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
|
|||
|
||||
tDecoderClear(&pTbCur->mr.coder);
|
||||
|
||||
metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
|
||||
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
|
||||
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
||||
continue;
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ _query:
|
|||
goto _err;
|
||||
}
|
||||
|
||||
version = *(int64_t *)pData;
|
||||
version = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
|
||||
SMetaEntry me = {0};
|
||||
|
@ -888,3 +888,41 @@ END:
|
|||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
|
||||
metaRLock(pMeta);
|
||||
|
||||
// search cache
|
||||
if (metaCacheGet(pMeta, uid, pInfo) == 0) {
|
||||
metaULock(pMeta);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// search TDB
|
||||
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
|
||||
// not found
|
||||
metaULock(pMeta);
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
metaULock(pMeta);
|
||||
|
||||
pInfo->uid = uid;
|
||||
pInfo->suid = ((SUidIdxVal *)pData)->suid;
|
||||
pInfo->version = ((SUidIdxVal *)pData)->version;
|
||||
pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;
|
||||
|
||||
// upsert the cache
|
||||
metaWLock(pMeta);
|
||||
metaCacheUpsert(pMeta, pInfo);
|
||||
metaULock(pMeta);
|
||||
|
||||
_exit:
|
||||
tdbFree(pData);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,23 @@ static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
|||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
||||
|
||||
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
|
||||
pInfo->uid = pEntry->uid;
|
||||
pInfo->version = pEntry->version;
|
||||
if (pEntry->type == TSDB_SUPER_TABLE) {
|
||||
pInfo->suid = pEntry->uid;
|
||||
pInfo->skmVer = pEntry->stbEntry.schemaRow.version;
|
||||
} else if (pEntry->type == TSDB_CHILD_TABLE) {
|
||||
pInfo->suid = pEntry->ctbEntry.suid;
|
||||
pInfo->skmVer = 0;
|
||||
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
|
||||
pInfo->suid = 0;
|
||||
pInfo->skmVer = pEntry->ntbEntry.schemaRow.version;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
|
||||
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
|
||||
if (NULL == pMetaRsp->pSchemas) {
|
||||
|
@ -164,29 +181,11 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
|
|||
|
||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||
SMetaEntry me = {0};
|
||||
int kLen = 0;
|
||||
int vLen = 0;
|
||||
const void *pKey = NULL;
|
||||
const void *pVal = NULL;
|
||||
void *pBuf = NULL;
|
||||
int32_t szBuf = 0;
|
||||
void *p = NULL;
|
||||
SMetaReader mr = {0};
|
||||
|
||||
// validate req
|
||||
metaReaderInit(&mr, pMeta, 0);
|
||||
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
||||
// TODO: just for pass case
|
||||
#if 0
|
||||
terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
#else
|
||||
metaReaderClear(&mr);
|
||||
if (tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name), NULL, NULL) == 0) {
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
|
||||
// set structs
|
||||
me.version = version;
|
||||
|
@ -265,8 +264,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
|
|||
// drop super table
|
||||
_drop_super_table:
|
||||
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
|
||||
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey),
|
||||
&pMeta->txn);
|
||||
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = ((SUidIdxVal *)pData)[0].version, .uid = pReq->suid},
|
||||
sizeof(STbDbKey), &pMeta->txn);
|
||||
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
|
||||
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
|
||||
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
|
||||
|
@ -309,7 +308,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
oversion = *(int64_t *)pData;
|
||||
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
|
||||
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
|
||||
|
@ -336,15 +335,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
metaSaveToSkmDb(pMeta, &nStbEntry);
|
||||
}
|
||||
|
||||
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
|
||||
// // change tag schema
|
||||
// }
|
||||
|
||||
// update table.db
|
||||
metaSaveToTbDb(pMeta, &nStbEntry);
|
||||
|
||||
// update uid index
|
||||
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
||||
SMetaInfo info;
|
||||
metaGetEntryInfo(&nStbEntry, &info);
|
||||
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t),
|
||||
&(SUidIdxVal){.suid = info.suid, .version = info.version, .skmVer = info.skmVer}, sizeof(SUidIdxVal), 0);
|
||||
|
||||
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||
metaULock(pMeta);
|
||||
|
@ -503,7 +501,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
SDecoder dc = {0};
|
||||
|
||||
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
|
||||
int64_t version = *(int64_t *)pData;
|
||||
int64_t version = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
|
||||
|
||||
|
@ -517,7 +515,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
int tLen = 0;
|
||||
|
||||
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
|
||||
version = *(int64_t *)tData;
|
||||
version = ((SUidIdxVal *)tData)[0].version;
|
||||
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = version};
|
||||
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
|
||||
SDecoder tdc = {0};
|
||||
|
@ -556,6 +554,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
--pMeta->pVnode->config.vndStats.numOfSTables;
|
||||
}
|
||||
|
||||
metaCacheDrop(pMeta, uid);
|
||||
|
||||
tDecoderClear(&dc);
|
||||
tdbFree(pData);
|
||||
|
||||
|
@ -594,7 +594,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
ASSERT(c == 0);
|
||||
|
||||
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||
oversion = *(int64_t *)pData;
|
||||
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
// search table.db
|
||||
TBC *pTbDbc = NULL;
|
||||
|
@ -708,7 +708,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
// save to table db
|
||||
metaSaveToTbDb(pMeta, &entry);
|
||||
|
||||
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
||||
metaUpdateUidIdx(pMeta, &entry);
|
||||
|
||||
metaSaveToSkmDb(pMeta, &entry);
|
||||
|
||||
|
@ -784,8 +784,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
|
||||
/* get stbEntry*/
|
||||
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
|
||||
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
|
||||
(void **)&stbEntry.pBuf, &nVal);
|
||||
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
|
||||
sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal);
|
||||
tdbFree(pVal);
|
||||
tDecoderInit(&dc2, stbEntry.pBuf, nVal);
|
||||
metaDecodeEntry(&dc2, &stbEntry);
|
||||
|
@ -859,7 +859,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
metaSaveToTbDb(pMeta, &ctbEntry);
|
||||
|
||||
// save to uid.idx
|
||||
tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
|
||||
metaUpdateUidIdx(pMeta, &ctbEntry);
|
||||
|
||||
if (iCol == 0) {
|
||||
metaUpdateTagIdx(pMeta, &ctbEntry);
|
||||
|
@ -914,7 +914,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
|
|||
ASSERT(c == 0);
|
||||
|
||||
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||
oversion = *(int64_t *)pData;
|
||||
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||
|
||||
// search table.db
|
||||
TBC *pTbDbc = NULL;
|
||||
|
@ -959,7 +959,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
|
|||
|
||||
// save to table db
|
||||
metaSaveToTbDb(pMeta, &entry);
|
||||
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
||||
metaUpdateUidIdx(pMeta, &entry);
|
||||
metaULock(pMeta);
|
||||
|
||||
tdbTbcClose(pTbDbc);
|
||||
|
@ -1042,7 +1042,14 @@ _err:
|
|||
}
|
||||
|
||||
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
|
||||
// upsert cache
|
||||
SMetaInfo info;
|
||||
metaGetEntryInfo(pME, &info);
|
||||
metaCacheUpsert(pMeta, &info);
|
||||
|
||||
SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer};
|
||||
|
||||
return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
|
||||
}
|
||||
|
||||
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
|
@ -1118,7 +1125,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
return -1;
|
||||
}
|
||||
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
||||
tbDbKey.version = *(int64_t *)pData;
|
||||
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
|
||||
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
||||
|
||||
tDecoderInit(&dc, pData, nData);
|
||||
|
|
|
@ -108,29 +108,21 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
|
|||
STbData *pTbData = NULL;
|
||||
tb_uid_t suid = pMsgIter->suid;
|
||||
tb_uid_t uid = pMsgIter->uid;
|
||||
int32_t sverNew;
|
||||
|
||||
// check if table exists (todo: refact)
|
||||
SMetaReader mr = {0};
|
||||
// SMetaEntry me = {0};
|
||||
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
SMetaInfo info;
|
||||
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
|
||||
if (code) {
|
||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
goto _err;
|
||||
}
|
||||
if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name);
|
||||
|
||||
if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||
sverNew = mr.me.ntbEntry.schemaRow.version;
|
||||
} else {
|
||||
tDecoderClear(&mr.coder);
|
||||
|
||||
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
|
||||
sverNew = mr.me.stbEntry.schemaRow.version;
|
||||
if (info.suid != suid) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
pRsp->sver = sverNew;
|
||||
if (info.suid) {
|
||||
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
|
||||
}
|
||||
pRsp->sver = info.skmVer;
|
||||
|
||||
// create/get STbData to op
|
||||
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
||||
|
@ -157,7 +149,17 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
SVBufPool *pPool = pTsdb->pVnode->inUse;
|
||||
TSDBKEY lastKey = {.version = version, .ts = eKey};
|
||||
|
||||
// check if table exists (todo)
|
||||
// check if table exists
|
||||
SMetaInfo info;
|
||||
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
|
||||
if (code) {
|
||||
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||
goto _err;
|
||||
}
|
||||
if (info.suid != suid) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
||||
if (code) {
|
||||
|
|
|
@ -869,7 +869,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
|||
|
||||
submitBlkRsp.uid = createTbReq.uid;
|
||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
|
||||
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
||||
|
||||
msgIter.uid = createTbReq.uid;
|
||||
if (createTbReq.type == TSDB_CHILD_TABLE) {
|
||||
|
|
Loading…
Reference in New Issue