Merge pull request #16145 from taosdata/refact/meta_3.0
Refact: meta 3.0
This commit is contained in:
commit
f5b6fb7b9b
|
@ -24,6 +24,7 @@ target_sources(
|
||||||
"src/meta/metaCommit.c"
|
"src/meta/metaCommit.c"
|
||||||
"src/meta/metaEntry.c"
|
"src/meta/metaEntry.c"
|
||||||
"src/meta/metaSnapshot.c"
|
"src/meta/metaSnapshot.c"
|
||||||
|
"src/meta/metaCache.c"
|
||||||
|
|
||||||
# sma
|
# sma
|
||||||
"src/sma/smaEnv.c"
|
"src/sma/smaEnv.c"
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SMetaIdx SMetaIdx;
|
typedef struct SMetaIdx SMetaIdx;
|
||||||
typedef struct SMetaDB SMetaDB;
|
typedef struct SMetaDB SMetaDB;
|
||||||
|
typedef struct SMetaCache SMetaCache;
|
||||||
|
|
||||||
// metaDebug ==================
|
// metaDebug ==================
|
||||||
// clang-format off
|
// clang-format off
|
||||||
|
@ -60,6 +61,13 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
|
||||||
// metaTable ==================
|
// metaTable ==================
|
||||||
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
|
int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME);
|
||||||
|
|
||||||
|
// metaCache ==================
|
||||||
|
int32_t metaCacheOpen(SMeta* pMeta);
|
||||||
|
void metaCacheClose(SMeta* pMeta);
|
||||||
|
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo);
|
||||||
|
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
|
||||||
|
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||||
|
|
||||||
struct SMeta {
|
struct SMeta {
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
|
|
||||||
|
@ -84,6 +92,8 @@ struct SMeta {
|
||||||
TTB* pStreamDb;
|
TTB* pStreamDb;
|
||||||
|
|
||||||
SMetaIdx* pIdx;
|
SMetaIdx* pIdx;
|
||||||
|
|
||||||
|
SMetaCache* pCache;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -92,6 +102,12 @@ typedef struct {
|
||||||
} STbDbKey;
|
} STbDbKey;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
typedef struct {
|
||||||
|
tb_uid_t suid;
|
||||||
|
int64_t version;
|
||||||
|
int32_t skmVer;
|
||||||
|
} SUidIdxVal;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
|
|
|
@ -130,6 +130,14 @@ int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
|
||||||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||||
|
|
||||||
|
typedef struct SMetaInfo {
|
||||||
|
int64_t uid;
|
||||||
|
int64_t suid;
|
||||||
|
int64_t version;
|
||||||
|
int32_t skmVer;
|
||||||
|
} SMetaInfo;
|
||||||
|
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo);
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
|
|
|
@ -0,0 +1,206 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include "meta.h"
|
||||||
|
|
||||||
|
#define META_CACHE_BASE_BUCKET 1024
|
||||||
|
|
||||||
|
// (uid , suid) : child table
|
||||||
|
// (uid, 0) : normal table
|
||||||
|
// (suid, suid) : super table
|
||||||
|
typedef struct SMetaCacheEntry SMetaCacheEntry;
|
||||||
|
struct SMetaCacheEntry {
|
||||||
|
SMetaCacheEntry* next;
|
||||||
|
SMetaInfo info;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct SMetaCache {
|
||||||
|
int32_t nEntry;
|
||||||
|
int32_t nBucket;
|
||||||
|
SMetaCacheEntry** aBucket;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SMetaCache* pCache = NULL;
|
||||||
|
|
||||||
|
pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
|
||||||
|
if (pCache == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCache->nEntry = 0;
|
||||||
|
pCache->nBucket = META_CACHE_BASE_BUCKET;
|
||||||
|
pCache->aBucket = (SMetaCacheEntry**)taosMemoryCalloc(pCache->nBucket, sizeof(SMetaCacheEntry*));
|
||||||
|
if (pCache->aBucket == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(pCache);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->pCache = pCache;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void metaCacheClose(SMeta* pMeta) {
|
||||||
|
if (pMeta->pCache) {
|
||||||
|
for (int32_t iBucket = 0; iBucket < pMeta->pCache->nBucket; iBucket++) {
|
||||||
|
SMetaCacheEntry* pEntry = pMeta->pCache->aBucket[iBucket];
|
||||||
|
while (pEntry) {
|
||||||
|
SMetaCacheEntry* tEntry = pEntry->next;
|
||||||
|
taosMemoryFree(pEntry);
|
||||||
|
pEntry = tEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFree(pMeta->pCache->aBucket);
|
||||||
|
taosMemoryFree(pMeta->pCache);
|
||||||
|
pMeta->pCache = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t nBucket;
|
||||||
|
|
||||||
|
if (expand) {
|
||||||
|
nBucket = pCache->nBucket * 2;
|
||||||
|
} else {
|
||||||
|
nBucket = pCache->nBucket / 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
|
||||||
|
if (aBucket == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// rehash
|
||||||
|
for (int32_t iBucket = 0; iBucket < pCache->nBucket; iBucket++) {
|
||||||
|
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket];
|
||||||
|
|
||||||
|
while (pEntry) {
|
||||||
|
SMetaCacheEntry* pTEntry = pEntry->next;
|
||||||
|
|
||||||
|
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
|
||||||
|
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
|
||||||
|
|
||||||
|
pEntry = pTEntry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// final set
|
||||||
|
taosMemoryFree(pCache->aBucket);
|
||||||
|
pCache->nBucket = nBucket;
|
||||||
|
pCache->aBucket = aBucket;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// ASSERT(metaIsWLocked(pMeta));
|
||||||
|
|
||||||
|
// search
|
||||||
|
SMetaCache* pCache = pMeta->pCache;
|
||||||
|
int32_t iBucket = TABS(pInfo->uid) % pCache->nBucket;
|
||||||
|
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket];
|
||||||
|
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
|
||||||
|
ppEntry = &(*ppEntry)->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*ppEntry) { // update
|
||||||
|
ASSERT(pInfo->suid == (*ppEntry)->info.suid);
|
||||||
|
if (pInfo->version > (*ppEntry)->info.version) {
|
||||||
|
(*ppEntry)->info.version = pInfo->version;
|
||||||
|
(*ppEntry)->info.skmVer = pInfo->skmVer;
|
||||||
|
}
|
||||||
|
} else { // insert
|
||||||
|
if (pCache->nEntry >= pCache->nBucket) {
|
||||||
|
code = metaRehashCache(pCache, 1);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
iBucket = TABS(pInfo->uid) % pCache->nBucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
|
||||||
|
if (pEntryNew == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pEntryNew->info = *pInfo;
|
||||||
|
pEntryNew->next = pCache->aBucket[iBucket];
|
||||||
|
pCache->aBucket[iBucket] = pEntryNew;
|
||||||
|
pCache->nEntry++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SMetaCache* pCache = pMeta->pCache;
|
||||||
|
int32_t iBucket = TABS(uid) % pCache->nBucket;
|
||||||
|
SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket];
|
||||||
|
while (*ppEntry && (*ppEntry)->info.uid != uid) {
|
||||||
|
ppEntry = &(*ppEntry)->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaCacheEntry* pEntry = *ppEntry;
|
||||||
|
if (pEntry) {
|
||||||
|
*ppEntry = pEntry->next;
|
||||||
|
taosMemoryFree(pEntry);
|
||||||
|
pCache->nEntry--;
|
||||||
|
if (pCache->nEntry < pCache->nBucket / 4 && pCache->nBucket > META_CACHE_BASE_BUCKET) {
|
||||||
|
code = metaRehashCache(pCache, 0);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SMetaCache* pCache = pMeta->pCache;
|
||||||
|
int32_t iBucket = TABS(uid) % pCache->nBucket;
|
||||||
|
SMetaCacheEntry* pEntry = pCache->aBucket[iBucket];
|
||||||
|
|
||||||
|
while (pEntry && pEntry->info.uid != uid) {
|
||||||
|
pEntry = pEntry->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry) {
|
||||||
|
*pInfo = pEntry->info;
|
||||||
|
} else {
|
||||||
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -73,7 +73,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// open pUidIdx
|
// open pUidIdx
|
||||||
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
|
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -143,6 +143,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = metaCacheOpen(pMeta);
|
||||||
|
if (code) {
|
||||||
|
terrno = code;
|
||||||
|
metaError("vgId:%d, failed to open meta cache since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
|
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
|
||||||
|
|
||||||
*ppMeta = pMeta;
|
*ppMeta = pMeta;
|
||||||
|
@ -169,6 +176,7 @@ _err:
|
||||||
|
|
||||||
int metaClose(SMeta *pMeta) {
|
int metaClose(SMeta *pMeta) {
|
||||||
if (pMeta) {
|
if (pMeta) {
|
||||||
|
if (pMeta->pCache) metaCacheClose(pMeta);
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||||
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
|
||||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||||
|
|
|
@ -63,7 +63,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
version = *(int64_t *)pReader->pBuf;
|
version = ((SUidIdxVal *)pReader->pBuf)[0].version;
|
||||||
return metaGetTableEntryByVersion(pReader, version, uid);
|
return metaGetTableEntryByVersion(pReader, version, uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
|
||||||
|
|
||||||
tDecoderClear(&pTbCur->mr.coder);
|
tDecoderClear(&pTbCur->mr.coder);
|
||||||
|
|
||||||
metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
|
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
|
||||||
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -185,7 +185,7 @@ _query:
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
version = *(int64_t *)pData;
|
version = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
|
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
|
@ -888,3 +888,41 @@ END:
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
void *pData = NULL;
|
||||||
|
int nData = 0;
|
||||||
|
|
||||||
|
metaRLock(pMeta);
|
||||||
|
|
||||||
|
// search cache
|
||||||
|
if (metaCacheGet(pMeta, uid, pInfo) == 0) {
|
||||||
|
metaULock(pMeta);
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// search TDB
|
||||||
|
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
|
||||||
|
// not found
|
||||||
|
metaULock(pMeta);
|
||||||
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
pInfo->uid = uid;
|
||||||
|
pInfo->suid = ((SUidIdxVal *)pData)->suid;
|
||||||
|
pInfo->version = ((SUidIdxVal *)pData)->version;
|
||||||
|
pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;
|
||||||
|
|
||||||
|
// upsert the cache
|
||||||
|
metaWLock(pMeta);
|
||||||
|
metaCacheUpsert(pMeta, pInfo);
|
||||||
|
metaULock(pMeta);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tdbFree(pData);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -28,9 +28,9 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
const void *pKey = NULL;
|
const void *pKey = NULL;
|
||||||
const void *pVal = NULL;
|
const void *pVal = NULL;
|
||||||
void * pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t szBuf = 0;
|
int32_t szBuf = 0;
|
||||||
void * p = NULL;
|
void *p = NULL;
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
// validate req
|
// validate req
|
||||||
|
@ -83,8 +83,8 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
|
||||||
|
|
||||||
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
STbDbKey tbDbKey;
|
STbDbKey tbDbKey;
|
||||||
void * pKey = NULL;
|
void *pKey = NULL;
|
||||||
void * pVal = NULL;
|
void *pVal = NULL;
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
SEncoder coder = {0};
|
SEncoder coder = {0};
|
||||||
|
@ -130,7 +130,8 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
|
SUidIdxVal uidIdxVal = {.suid = pME->smaEntry.tsma->indexUid, .version = pME->version, .skmVer = 0};
|
||||||
|
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
|
|
@ -27,6 +27,23 @@ static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
|
||||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
|
||||||
|
|
||||||
|
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
|
||||||
|
pInfo->uid = pEntry->uid;
|
||||||
|
pInfo->version = pEntry->version;
|
||||||
|
if (pEntry->type == TSDB_SUPER_TABLE) {
|
||||||
|
pInfo->suid = pEntry->uid;
|
||||||
|
pInfo->skmVer = pEntry->stbEntry.schemaRow.version;
|
||||||
|
} else if (pEntry->type == TSDB_CHILD_TABLE) {
|
||||||
|
pInfo->suid = pEntry->ctbEntry.suid;
|
||||||
|
pInfo->skmVer = 0;
|
||||||
|
} else if (pEntry->type == TSDB_NORMAL_TABLE) {
|
||||||
|
pInfo->suid = 0;
|
||||||
|
pInfo->skmVer = pEntry->ntbEntry.schemaRow.version;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
|
static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) {
|
||||||
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
|
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
|
||||||
if (NULL == pMetaRsp->pSchemas) {
|
if (NULL == pMetaRsp->pSchemas) {
|
||||||
|
@ -164,29 +181,11 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
|
||||||
|
|
||||||
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
int kLen = 0;
|
|
||||||
int vLen = 0;
|
|
||||||
const void *pKey = NULL;
|
|
||||||
const void *pVal = NULL;
|
|
||||||
void *pBuf = NULL;
|
|
||||||
int32_t szBuf = 0;
|
|
||||||
void *p = NULL;
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
|
|
||||||
// validate req
|
// validate req
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
if (tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name), NULL, NULL) == 0) {
|
||||||
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
|
||||||
// TODO: just for pass case
|
|
||||||
#if 0
|
|
||||||
terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST;
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
return 0;
|
return 0;
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
metaReaderClear(&mr);
|
|
||||||
|
|
||||||
// set structs
|
// set structs
|
||||||
me.version = version;
|
me.version = version;
|
||||||
|
@ -265,8 +264,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
|
||||||
// drop super table
|
// drop super table
|
||||||
_drop_super_table:
|
_drop_super_table:
|
||||||
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
|
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
|
||||||
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey),
|
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = ((SUidIdxVal *)pData)[0].version, .uid = pReq->suid},
|
||||||
&pMeta->txn);
|
sizeof(STbDbKey), &pMeta->txn);
|
||||||
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
|
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
|
||||||
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
|
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
|
||||||
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
|
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
|
||||||
|
@ -309,7 +308,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
oversion = *(int64_t *)pData;
|
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
|
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
|
||||||
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
|
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
|
||||||
|
@ -336,15 +335,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
metaSaveToSkmDb(pMeta, &nStbEntry);
|
metaSaveToSkmDb(pMeta, &nStbEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
|
|
||||||
// // change tag schema
|
|
||||||
// }
|
|
||||||
|
|
||||||
// update table.db
|
// update table.db
|
||||||
metaSaveToTbDb(pMeta, &nStbEntry);
|
metaSaveToTbDb(pMeta, &nStbEntry);
|
||||||
|
|
||||||
// update uid index
|
// update uid index
|
||||||
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
SMetaInfo info;
|
||||||
|
metaGetEntryInfo(&nStbEntry, &info);
|
||||||
|
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t),
|
||||||
|
&(SUidIdxVal){.suid = info.suid, .version = info.version, .skmVer = info.skmVer}, sizeof(SUidIdxVal), 0);
|
||||||
|
|
||||||
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
@ -503,7 +501,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
SDecoder dc = {0};
|
SDecoder dc = {0};
|
||||||
|
|
||||||
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
|
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
|
||||||
int64_t version = *(int64_t *)pData;
|
int64_t version = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
|
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
|
||||||
|
|
||||||
|
@ -517,7 +515,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
int tLen = 0;
|
int tLen = 0;
|
||||||
|
|
||||||
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
|
if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) {
|
||||||
version = *(int64_t *)tData;
|
version = ((SUidIdxVal *)tData)[0].version;
|
||||||
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = version};
|
STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = version};
|
||||||
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
|
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) {
|
||||||
SDecoder tdc = {0};
|
SDecoder tdc = {0};
|
||||||
|
@ -556,6 +554,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
--pMeta->pVnode->config.vndStats.numOfSTables;
|
--pMeta->pVnode->config.vndStats.numOfSTables;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metaCacheDrop(pMeta, uid);
|
||||||
|
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
|
|
||||||
|
@ -594,7 +594,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
ASSERT(c == 0);
|
ASSERT(c == 0);
|
||||||
|
|
||||||
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||||
oversion = *(int64_t *)pData;
|
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
// search table.db
|
// search table.db
|
||||||
TBC *pTbDbc = NULL;
|
TBC *pTbDbc = NULL;
|
||||||
|
@ -708,7 +708,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
// save to table db
|
// save to table db
|
||||||
metaSaveToTbDb(pMeta, &entry);
|
metaSaveToTbDb(pMeta, &entry);
|
||||||
|
|
||||||
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
metaUpdateUidIdx(pMeta, &entry);
|
||||||
|
|
||||||
metaSaveToSkmDb(pMeta, &entry);
|
metaSaveToSkmDb(pMeta, &entry);
|
||||||
|
|
||||||
|
@ -764,7 +764,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
||||||
ASSERT(c == 0);
|
ASSERT(c == 0);
|
||||||
|
|
||||||
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||||
oversion = *(int64_t *)pData;
|
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
// search table.db
|
// search table.db
|
||||||
TBC *pTbDbc = NULL;
|
TBC *pTbDbc = NULL;
|
||||||
|
@ -784,8 +784,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
||||||
|
|
||||||
/* get stbEntry*/
|
/* get stbEntry*/
|
||||||
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
|
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
|
||||||
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
|
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
|
||||||
(void **)&stbEntry.pBuf, &nVal);
|
sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tDecoderInit(&dc2, stbEntry.pBuf, nVal);
|
tDecoderInit(&dc2, stbEntry.pBuf, nVal);
|
||||||
metaDecodeEntry(&dc2, &stbEntry);
|
metaDecodeEntry(&dc2, &stbEntry);
|
||||||
|
@ -859,7 +859,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
||||||
metaSaveToTbDb(pMeta, &ctbEntry);
|
metaSaveToTbDb(pMeta, &ctbEntry);
|
||||||
|
|
||||||
// save to uid.idx
|
// save to uid.idx
|
||||||
tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
|
metaUpdateUidIdx(pMeta, &ctbEntry);
|
||||||
|
|
||||||
if (iCol == 0) {
|
if (iCol == 0) {
|
||||||
metaUpdateTagIdx(pMeta, &ctbEntry);
|
metaUpdateTagIdx(pMeta, &ctbEntry);
|
||||||
|
@ -914,7 +914,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
|
||||||
ASSERT(c == 0);
|
ASSERT(c == 0);
|
||||||
|
|
||||||
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
|
||||||
oversion = *(int64_t *)pData;
|
oversion = ((SUidIdxVal *)pData)[0].version;
|
||||||
|
|
||||||
// search table.db
|
// search table.db
|
||||||
TBC *pTbDbc = NULL;
|
TBC *pTbDbc = NULL;
|
||||||
|
@ -959,7 +959,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
|
||||||
|
|
||||||
// save to table db
|
// save to table db
|
||||||
metaSaveToTbDb(pMeta, &entry);
|
metaSaveToTbDb(pMeta, &entry);
|
||||||
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
|
metaUpdateUidIdx(pMeta, &entry);
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
tdbTbcClose(pTbDbc);
|
tdbTbcClose(pTbDbc);
|
||||||
|
@ -1042,7 +1042,14 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
|
// upsert cache
|
||||||
|
SMetaInfo info;
|
||||||
|
metaGetEntryInfo(pME, &info);
|
||||||
|
metaCacheUpsert(pMeta, &info);
|
||||||
|
|
||||||
|
SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer};
|
||||||
|
|
||||||
|
return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
@ -1118,7 +1125,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
||||||
tbDbKey.version = *(int64_t *)pData;
|
tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
|
||||||
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
||||||
|
|
||||||
tDecoderInit(&dc, pData, nData);
|
tDecoderInit(&dc, pData, nData);
|
||||||
|
|
|
@ -108,29 +108,21 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
|
||||||
STbData *pTbData = NULL;
|
STbData *pTbData = NULL;
|
||||||
tb_uid_t suid = pMsgIter->suid;
|
tb_uid_t suid = pMsgIter->suid;
|
||||||
tb_uid_t uid = pMsgIter->uid;
|
tb_uid_t uid = pMsgIter->uid;
|
||||||
int32_t sverNew;
|
|
||||||
|
|
||||||
// check if table exists (todo: refact)
|
SMetaInfo info;
|
||||||
SMetaReader mr = {0};
|
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
|
||||||
// SMetaEntry me = {0};
|
if (code) {
|
||||||
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) {
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name);
|
if (info.suid != suid) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
if (mr.me.type == TSDB_NORMAL_TABLE) {
|
goto _err;
|
||||||
sverNew = mr.me.ntbEntry.schemaRow.version;
|
|
||||||
} else {
|
|
||||||
tDecoderClear(&mr.coder);
|
|
||||||
|
|
||||||
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
|
|
||||||
sverNew = mr.me.stbEntry.schemaRow.version;
|
|
||||||
}
|
}
|
||||||
metaReaderClear(&mr);
|
if (info.suid) {
|
||||||
pRsp->sver = sverNew;
|
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
|
||||||
|
}
|
||||||
|
pRsp->sver = info.skmVer;
|
||||||
|
|
||||||
// create/get STbData to op
|
// create/get STbData to op
|
||||||
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
||||||
|
@ -157,7 +149,17 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
SVBufPool *pPool = pTsdb->pVnode->inUse;
|
SVBufPool *pPool = pTsdb->pVnode->inUse;
|
||||||
TSDBKEY lastKey = {.version = version, .ts = eKey};
|
TSDBKEY lastKey = {.version = version, .ts = eKey};
|
||||||
|
|
||||||
// check if table exists (todo)
|
// check if table exists
|
||||||
|
SMetaInfo info;
|
||||||
|
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info);
|
||||||
|
if (code) {
|
||||||
|
code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
if (info.suid != suid) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -869,7 +869,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
|
|
||||||
submitBlkRsp.uid = createTbReq.uid;
|
submitBlkRsp.uid = createTbReq.uid;
|
||||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||||
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
|
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
||||||
|
|
||||||
msgIter.uid = createTbReq.uid;
|
msgIter.uid = createTbReq.uid;
|
||||||
if (createTbReq.type == TSDB_CHILD_TABLE) {
|
if (createTbReq.type == TSDB_CHILD_TABLE) {
|
||||||
|
|
Loading…
Reference in New Issue