enh: refactor return code
This commit is contained in:
parent
26c7943279
commit
10263b6b4e
|
@ -22,7 +22,6 @@ set(
|
||||||
|
|
||||||
# meta
|
# meta
|
||||||
"src/meta/metaOpen.c"
|
"src/meta/metaOpen.c"
|
||||||
"src/meta/metaIdx.c"
|
|
||||||
"src/meta/metaTable.c"
|
"src/meta/metaTable.c"
|
||||||
"src/meta/metaSma.c"
|
"src/meta/metaSma.c"
|
||||||
"src/meta/metaQuery.c"
|
"src/meta/metaQuery.c"
|
||||||
|
|
|
@ -318,7 +318,7 @@ void* tdUidStoreFree(STbUidStore* pStore);
|
||||||
|
|
||||||
// SMetaSnapReader ========================================
|
// SMetaSnapReader ========================================
|
||||||
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
|
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
|
||||||
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader);
|
void metaSnapReaderClose(SMetaSnapReader** ppReader);
|
||||||
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData);
|
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData);
|
||||||
// SMetaSnapWriter ========================================
|
// SMetaSnapWriter ========================================
|
||||||
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter);
|
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter);
|
||||||
|
|
|
@ -117,91 +117,81 @@ static void freeCacheEntryFp(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaCacheOpen(SMeta* pMeta) {
|
int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMetaCache* pCache = NULL;
|
int32_t lino;
|
||||||
|
|
||||||
pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache));
|
pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
|
||||||
if (pCache == NULL) {
|
if (pMeta->pCache == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// open entry cache
|
// open entry cache
|
||||||
pCache->sEntryCache.nEntry = 0;
|
pMeta->pCache->sEntryCache.nEntry = 0;
|
||||||
pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
|
pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
|
||||||
pCache->sEntryCache.aBucket =
|
pMeta->pCache->sEntryCache.aBucket =
|
||||||
(SMetaCacheEntry**)taosMemoryCalloc(pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
|
(SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
|
||||||
if (pCache->sEntryCache.aBucket == NULL) {
|
if (pMeta->pCache->sEntryCache.aBucket == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// open stats cache
|
// open stats cache
|
||||||
pCache->sStbStatsCache.nEntry = 0;
|
pMeta->pCache->sStbStatsCache.nEntry = 0;
|
||||||
pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
|
pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
|
||||||
pCache->sStbStatsCache.aBucket =
|
pMeta->pCache->sStbStatsCache.aBucket =
|
||||||
(SMetaStbStatsEntry**)taosMemoryCalloc(pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
|
(SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
|
||||||
if (pCache->sStbStatsCache.aBucket == NULL) {
|
if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
|
pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
|
||||||
if (pCache->sTagFilterResCache.pUidResCache == NULL) {
|
if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
goto _err2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->sTagFilterResCache.accTimes = 0;
|
pMeta->pCache->sTagFilterResCache.accTimes = 0;
|
||||||
pCache->sTagFilterResCache.pTableEntry =
|
pMeta->pCache->sTagFilterResCache.pTableEntry =
|
||||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
||||||
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
|
if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
goto _err2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashSetFreeFp(pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
|
taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
|
||||||
taosThreadMutexInit(&pCache->sTagFilterResCache.lock, NULL);
|
taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
|
||||||
|
|
||||||
pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
|
pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
|
||||||
if (pCache->STbGroupResCache.pResCache == NULL) {
|
if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
goto _err2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->STbGroupResCache.accTimes = 0;
|
pMeta->pCache->STbGroupResCache.accTimes = 0;
|
||||||
pCache->STbGroupResCache.pTableEntry =
|
pMeta->pCache->STbGroupResCache.pTableEntry =
|
||||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
||||||
if (pCache->STbGroupResCache.pTableEntry == NULL) {
|
if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
goto _err2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashSetFreeFp(pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
|
taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
|
||||||
taosThreadMutexInit(&pCache->STbGroupResCache.lock, NULL);
|
taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
|
||||||
|
|
||||||
pCache->STbFilterCache.pStb = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
pMeta->pCache->STbFilterCache.pStb =
|
||||||
if (pCache->STbFilterCache.pStb == NULL) {
|
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
if (pMeta->pCache->STbFilterCache.pStb == NULL) {
|
||||||
goto _err2;
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->STbFilterCache.pStbName =
|
pMeta->pCache->STbFilterCache.pStbName =
|
||||||
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
||||||
if (pCache->STbFilterCache.pStbName == NULL) {
|
if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = terrno, lino, _exit);
|
||||||
goto _err2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->pCache = pCache;
|
_exit:
|
||||||
return code;
|
if (code) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
_err2:
|
metaCacheClose(pMeta);
|
||||||
entryCacheClose(pMeta);
|
} else {
|
||||||
|
metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
|
||||||
_err:
|
}
|
||||||
taosMemoryFree(pCache);
|
|
||||||
metaError("vgId:%d, meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,8 +279,7 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
|
||||||
}
|
}
|
||||||
} else { // insert
|
} else { // insert
|
||||||
if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
|
if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
|
||||||
code = metaRehashCache(pCache, 1);
|
TAOS_UNUSED(metaRehashCache(pCache, 1));
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
|
iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
|
||||||
}
|
}
|
||||||
|
@ -328,8 +317,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
|
||||||
pCache->sEntryCache.nEntry--;
|
pCache->sEntryCache.nEntry--;
|
||||||
if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
|
if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
|
||||||
pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
|
pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
|
||||||
code = metaRehashCache(pCache, 0);
|
TAOS_UNUSED(metaRehashCache(pCache, 0));
|
||||||
if (code) goto _exit;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_NOT_FOUND;
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
@ -351,7 +339,9 @@ int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry) {
|
if (pEntry) {
|
||||||
*pInfo = pEntry->info;
|
if (pInfo) {
|
||||||
|
*pInfo = pEntry->info;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_NOT_FOUND;
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
}
|
}
|
||||||
|
@ -415,9 +405,7 @@ int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
|
||||||
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
|
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
|
||||||
} else { // insert
|
} else { // insert
|
||||||
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
|
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
|
||||||
code = metaRehashStatsCache(pCache, 1);
|
TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
|
iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,8 +442,7 @@ int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
|
||||||
pCache->sStbStatsCache.nEntry--;
|
pCache->sStbStatsCache.nEntry--;
|
||||||
if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
|
if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
|
||||||
pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
|
pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
|
||||||
code = metaRehashStatsCache(pCache, 0);
|
TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
|
||||||
if (code) goto _exit;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_NOT_FOUND;
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
|
@ -477,7 +464,9 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry) {
|
if (pEntry) {
|
||||||
*pInfo = pEntry->info;
|
if (pInfo) {
|
||||||
|
*pInfo = pEntry->info;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_NOT_FOUND;
|
code = TSDB_CODE_NOT_FOUND;
|
||||||
}
|
}
|
||||||
|
@ -502,7 +491,9 @@ static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInv
|
||||||
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
||||||
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
|
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
|
||||||
if (pRes == NULL) { // remove the item in the linked list
|
if (pRes == NULL) { // remove the item in the linked list
|
||||||
taosArrayPush(pInvalidRes, &pNode);
|
if (taosArrayPush(pInvalidRes, &pNode) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
taosLRUCacheRelease(pCache, pRes, false);
|
taosLRUCacheRelease(pCache, pRes, false);
|
||||||
}
|
}
|
||||||
|
@ -626,7 +617,7 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL
|
||||||
|
|
||||||
p->hitTimes = 0;
|
p->hitTimes = 0;
|
||||||
tdListInit(&p->list, keyLen);
|
tdListInit(&p->list, keyLen);
|
||||||
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
|
TAOS_CHECK_RETURN(taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES));
|
||||||
tdListAppend(&p->list, pKey);
|
tdListAppend(&p->list, pKey);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -956,9 +947,7 @@ int32_t metaInitTbFilterCache(SMeta* pMeta) {
|
||||||
}
|
}
|
||||||
if (tbNum && pTbArr) {
|
if (tbNum && pTbArr) {
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
if (metaPutTbToFilterCache(pMeta, pTbArr[i], 1) != 0) {
|
TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
|
||||||
return terrno ? terrno : -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
|
|
@ -21,7 +21,10 @@ static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) {
|
||||||
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
||||||
|
|
||||||
// begin a meta txn
|
// begin a meta txn
|
||||||
int metaBegin(SMeta *pMeta, int8_t heap) {
|
int32_t metaBegin(SMeta *pMeta, int8_t heap) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
void *(*xMalloc)(void *, size_t) = NULL;
|
void *(*xMalloc)(void *, size_t) = NULL;
|
||||||
void (*xFree)(void *, void *) = NULL;
|
void (*xFree)(void *, void *) = NULL;
|
||||||
void *xArg = NULL;
|
void *xArg = NULL;
|
||||||
|
@ -36,12 +39,19 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
|
||||||
xArg = pMeta->pVnode->inUse;
|
xArg = pMeta->pVnode->inUse;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
code = tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
return -1;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tdbCommit(pMeta->pEnv, pMeta->txn);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||||
|
tstrerror(terrno));
|
||||||
|
} else {
|
||||||
|
metaDebug("vgId:%d %s success", TD_VID(pMeta->pVnode), __func__);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbCommit(pMeta->pEnv, pMeta->txn);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,20 +59,36 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
|
||||||
TXN *metaGetTxn(SMeta *pMeta) { return pMeta->txn; }
|
TXN *metaGetTxn(SMeta *pMeta) { return pMeta->txn; }
|
||||||
int metaCommit(SMeta *pMeta, TXN *txn) { return tdbCommit(pMeta->pEnv, txn); }
|
int metaCommit(SMeta *pMeta, TXN *txn) { return tdbCommit(pMeta->pEnv, txn); }
|
||||||
int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv, txn); }
|
int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv, txn); }
|
||||||
int metaPrepareAsyncCommit(SMeta *pMeta) {
|
|
||||||
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
|
int metaPrepareAsyncCommit(SMeta *pMeta) {
|
||||||
int code = 0;
|
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
|
||||||
|
int code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
|
TAOS_UNUSED(ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn));
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
|
|
||||||
code = tdbCommit(pMeta->pEnv, pMeta->txn);
|
code = tdbCommit(pMeta->pEnv, pMeta->txn);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
pMeta->changed = false;
|
pMeta->changed = false;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__,
|
||||||
|
tstrerror(terrno));
|
||||||
|
} else {
|
||||||
|
metaDebug("vgId:%d %s success", TD_VID(pMeta->pVnode), __func__);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// abort the meta txn
|
// abort the meta txn
|
||||||
int metaAbort(SMeta *pMeta) {
|
int metaAbort(SMeta *pMeta) {
|
||||||
if (!pMeta->txn) return 0;
|
if (!pMeta->txn) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int code = tdbAbort(pMeta->pEnv, pMeta->txn);
|
int code = tdbAbort(pMeta->pEnv, pMeta->txn);
|
||||||
if (code) {
|
if (code) {
|
||||||
metaError("vgId:%d, failed to abort meta since %s", TD_VID(pMeta->pVnode), tstrerror(terrno));
|
metaError("vgId:%d, failed to abort meta since %s", TD_VID(pMeta->pVnode), tstrerror(terrno));
|
||||||
|
|
|
@ -17,159 +17,166 @@
|
||||||
|
|
||||||
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
const SColCmprWrapper *pw = &pME->colCmpr;
|
const SColCmprWrapper *pw = &pME->colCmpr;
|
||||||
if (tEncodeI32v(pCoder, pw->nCols) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols));
|
||||||
if (tEncodeI32v(pCoder, pw->version) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->version));
|
||||||
uDebug("encode cols:%d", pw->nCols);
|
uDebug("encode cols:%d", pw->nCols);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pw->nCols; i++) {
|
for (int32_t i = 0; i < pw->nCols; i++) {
|
||||||
SColCmpr *p = &pw->pColCmpr[i];
|
SColCmpr *p = &pw->pColCmpr[i];
|
||||||
if (tEncodeI16v(pCoder, p->id) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI16v(pCoder, p->id));
|
||||||
if (tEncodeU32(pCoder, p->alg) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeU32(pCoder, p->alg));
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int meteDecodeColCmprEntry(SDecoder *pDecoder, SMetaEntry *pME) {
|
int meteDecodeColCmprEntry(SDecoder *pDecoder, SMetaEntry *pME) {
|
||||||
SColCmprWrapper *pWrapper = &pME->colCmpr;
|
SColCmprWrapper *pWrapper = &pME->colCmpr;
|
||||||
if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pWrapper->nCols));
|
||||||
if (pWrapper->nCols == 0) {
|
if (pWrapper->nCols == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pWrapper->version));
|
||||||
uDebug("dencode cols:%d", pWrapper->nCols);
|
uDebug("dencode cols:%d", pWrapper->nCols);
|
||||||
pWrapper->pColCmpr = (SColCmpr *)tDecoderMalloc(pDecoder, pWrapper->nCols * sizeof(SColCmpr));
|
pWrapper->pColCmpr = (SColCmpr *)tDecoderMalloc(pDecoder, pWrapper->nCols * sizeof(SColCmpr));
|
||||||
if (pWrapper->pColCmpr == NULL) return -1;
|
if (pWrapper->pColCmpr == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pWrapper->nCols; i++) {
|
for (int i = 0; i < pWrapper->nCols; i++) {
|
||||||
SColCmpr *p = &pWrapper->pColCmpr[i];
|
SColCmpr *p = &pWrapper->pColCmpr[i];
|
||||||
if (tDecodeI16v(pDecoder, &p->id) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI16v(pDecoder, &p->id));
|
||||||
if (tDecodeU32(pDecoder, &p->alg) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &p->alg));
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static FORCE_INLINE void metatInitDefaultSColCmprWrapper(SDecoder *pDecoder, SColCmprWrapper *pCmpr,
|
static FORCE_INLINE int32_t metatInitDefaultSColCmprWrapper(SDecoder *pDecoder, SColCmprWrapper *pCmpr,
|
||||||
SSchemaWrapper *pSchema) {
|
SSchemaWrapper *pSchema) {
|
||||||
pCmpr->nCols = pSchema->nCols;
|
pCmpr->nCols = pSchema->nCols;
|
||||||
pCmpr->pColCmpr = (SColCmpr *)tDecoderMalloc(pDecoder, pCmpr->nCols * sizeof(SColCmpr));
|
if ((pCmpr->pColCmpr = (SColCmpr *)tDecoderMalloc(pDecoder, pCmpr->nCols * sizeof(SColCmpr))) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCmpr->nCols; i++) {
|
for (int32_t i = 0; i < pCmpr->nCols; i++) {
|
||||||
SColCmpr *pColCmpr = &pCmpr->pColCmpr[i];
|
SColCmpr *pColCmpr = &pCmpr->pColCmpr[i];
|
||||||
SSchema *pColSchema = &pSchema->pSchema[i];
|
SSchema *pColSchema = &pSchema->pSchema[i];
|
||||||
pColCmpr->id = pColSchema->colId;
|
pColCmpr->id = pColSchema->colId;
|
||||||
pColCmpr->alg = createDefaultColCmprByType(pColSchema->type);
|
pColCmpr->alg = createDefaultColCmprByType(pColSchema->type);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
TAOS_CHECK_RETURN(tStartEncode(pCoder));
|
||||||
|
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->version));
|
||||||
|
TAOS_CHECK_RETURN(tEncodeI8(pCoder, pME->type));
|
||||||
|
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->uid));
|
||||||
|
|
||||||
if (tEncodeI64(pCoder, pME->version) < 0) return -1;
|
if (pME->name == NULL) {
|
||||||
if (tEncodeI8(pCoder, pME->type) < 0) return -1;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
if (tEncodeI64(pCoder, pME->uid) < 0) return -1;
|
}
|
||||||
if (pME->name == NULL || tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
|
||||||
|
TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pME->name));
|
||||||
|
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor?
|
TAOS_CHECK_RETURN(tEncodeI8(pCoder, pME->flags));
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow));
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag));
|
||||||
if (TABLE_IS_ROLLUP(pME->flags)) {
|
if (TABLE_IS_ROLLUP(pME->flags)) {
|
||||||
if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam));
|
||||||
}
|
}
|
||||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
if (tEncodeI64(pCoder, pME->ctbEntry.btime) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->ctbEntry.btime));
|
||||||
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32(pCoder, pME->ctbEntry.ttlDays));
|
||||||
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pME->ctbEntry.commentLen));
|
||||||
if (pME->ctbEntry.commentLen > 0) {
|
if (pME->ctbEntry.commentLen > 0) {
|
||||||
if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pME->ctbEntry.comment));
|
||||||
}
|
}
|
||||||
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->ctbEntry.suid));
|
||||||
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags));
|
||||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
if (tEncodeI64(pCoder, pME->ntbEntry.btime) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pME->ntbEntry.btime));
|
||||||
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32(pCoder, pME->ntbEntry.ttlDays));
|
||||||
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pME->ntbEntry.commentLen));
|
||||||
if (pME->ntbEntry.commentLen > 0) {
|
if (pME->ntbEntry.commentLen > 0) {
|
||||||
if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pME->ntbEntry.comment));
|
||||||
}
|
}
|
||||||
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pME->ntbEntry.ncid));
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow));
|
||||||
} else if (pME->type == TSDB_TSMA_TABLE) {
|
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||||
if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
|
TAOS_CHECK_RETURN(tEncodeTSma(pCoder, pME->smaEntry.tsma));
|
||||||
} else {
|
} else {
|
||||||
metaError("meta/entry: invalide table type: %" PRId8 " encode failed.", pME->type);
|
metaError("meta/entry: invalide table type: %" PRId8 " encode failed.", pME->type);
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
if (meteEncodeColCmprEntry(pCoder, pME) < 0) return -1;
|
TAOS_CHECK_RETURN(meteEncodeColCmprEntry(pCoder, pME));
|
||||||
|
|
||||||
tEndEncode(pCoder);
|
tEndEncode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
TAOS_CHECK_RETURN(tStartDecode(pCoder));
|
||||||
|
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->version));
|
||||||
if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->type));
|
||||||
if (tDecodeI8(pCoder, &pME->type) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->uid));
|
||||||
if (tDecodeI64(pCoder, &pME->uid) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->name));
|
||||||
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
|
||||||
|
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
if (tDecodeI8(pCoder, &pME->flags) < 0) return -1; // TODO: need refactor?
|
TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pME->flags));
|
||||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow));
|
||||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag));
|
||||||
if (TABLE_IS_ROLLUP(pME->flags)) {
|
if (TABLE_IS_ROLLUP(pME->flags)) {
|
||||||
if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam));
|
||||||
}
|
}
|
||||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.btime) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->ctbEntry.btime));
|
||||||
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32(pCoder, &pME->ctbEntry.ttlDays));
|
||||||
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32v(pCoder, &pME->ctbEntry.commentLen));
|
||||||
if (pME->ctbEntry.commentLen > 0) {
|
if (pME->ctbEntry.commentLen > 0) {
|
||||||
if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->ctbEntry.comment));
|
||||||
}
|
}
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->ctbEntry.suid));
|
||||||
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
|
TAOS_CHECK_RETURN(tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags));
|
||||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
if (tDecodeI64(pCoder, &pME->ntbEntry.btime) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pME->ntbEntry.btime));
|
||||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32(pCoder, &pME->ntbEntry.ttlDays));
|
||||||
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32v(pCoder, &pME->ntbEntry.commentLen));
|
||||||
if (pME->ntbEntry.commentLen > 0) {
|
if (pME->ntbEntry.commentLen > 0) {
|
||||||
if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pME->ntbEntry.comment));
|
||||||
}
|
}
|
||||||
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeI32v(pCoder, &pME->ntbEntry.ncid));
|
||||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow));
|
||||||
} else if (pME->type == TSDB_TSMA_TABLE) {
|
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||||
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
|
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
|
||||||
if (!pME->smaEntry.tsma) {
|
if (!pME->smaEntry.tsma) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
if (tDecodeTSma(pCoder, pME->smaEntry.tsma, true) < 0) return -1;
|
TAOS_CHECK_RETURN(tDecodeTSma(pCoder, pME->smaEntry.tsma, true));
|
||||||
} else {
|
} else {
|
||||||
metaError("meta/entry: invalide table type: %" PRId8 " decode failed.", pME->type);
|
metaError("meta/entry: invalide table type: %" PRId8 " decode failed.", pME->type);
|
||||||
return -1;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
if (TABLE_IS_COL_COMPRESSED(pME->flags)) {
|
if (TABLE_IS_COL_COMPRESSED(pME->flags)) {
|
||||||
if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1;
|
TAOS_CHECK_RETURN(meteDecodeColCmprEntry(pCoder, pME));
|
||||||
|
|
||||||
if (pME->colCmpr.nCols == 0) {
|
if (pME->colCmpr.nCols == 0) {
|
||||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow);
|
TAOS_CHECK_RETURN(metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow);
|
TAOS_CHECK_RETURN(metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow));
|
||||||
TABLE_SET_COL_COMPRESSED(pME->flags);
|
TABLE_SET_COL_COMPRESSED(pME->flags);
|
||||||
}
|
}
|
||||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||||
if (!tDecodeIsEnd(pCoder)) {
|
if (!tDecodeIsEnd(pCoder)) {
|
||||||
uDebug("set type: %d, tableName:%s", pME->type, pME->name);
|
uDebug("set type: %d, tableName:%s", pME->type, pME->name);
|
||||||
if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1;
|
TAOS_CHECK_RETURN(meteDecodeColCmprEntry(pCoder, pME));
|
||||||
if (pME->colCmpr.nCols == 0) {
|
if (pME->colCmpr.nCols == 0) {
|
||||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow);
|
TAOS_CHECK_RETURN(metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uDebug("set default type: %d, tableName:%s", pME->type, pME->name);
|
uDebug("set default type: %d, tableName:%s", pME->type, pME->name);
|
||||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow);
|
TAOS_CHECK_RETURN(metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow));
|
||||||
}
|
}
|
||||||
TABLE_SET_COL_COMPRESSED(pME->flags);
|
TABLE_SET_COL_COMPRESSED(pME->flags);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,118 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
#include "index.h"
|
|
||||||
#endif
|
|
||||||
#include "meta.h"
|
|
||||||
|
|
||||||
struct SMetaIdx {
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
SIndex *pIdx;
|
|
||||||
#endif
|
|
||||||
/* data */
|
|
||||||
#ifdef WINDOWS
|
|
||||||
size_t avoidCompilationErrors;
|
|
||||||
#endif
|
|
||||||
};
|
|
||||||
|
|
||||||
int metaOpenIdx(SMeta *pMeta) {
|
|
||||||
#if 0
|
|
||||||
char idxDir[128]; // TODO
|
|
||||||
char * err = NULL;
|
|
||||||
rocksdb_options_t *options = rocksdb_options_create();
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
sprintf(idxDir, "%s/index", pMeta->path);
|
|
||||||
|
|
||||||
if (pMeta->pCache) {
|
|
||||||
rocksdb_options_set_row_cache(options, pMeta->pCache);
|
|
||||||
}
|
|
||||||
rocksdb_options_set_create_if_missing(options, 1);
|
|
||||||
|
|
||||||
pMeta->pIdx = rocksdb_open(options, idxDir, &err);
|
|
||||||
if (pMeta->pIdx == NULL) {
|
|
||||||
// TODO: handle error
|
|
||||||
rocksdb_options_destroy(options);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rocksdb_options_destroy(options);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
// SIndexOpts opts;
|
|
||||||
// if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
|
|
||||||
// return -1;
|
|
||||||
//}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
void metaCloseIdx(SMeta *pMeta) { /* TODO */
|
|
||||||
#if 0
|
|
||||||
if (pMeta->pIdx) {
|
|
||||||
rocksdb_close(pMeta->pIdx);
|
|
||||||
pMeta->pIdx = NULL;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
// SIndexOpts opts;
|
|
||||||
// if (indexClose(pMeta->pIdx->pIdx) != 0) {
|
|
||||||
// return -1;
|
|
||||||
//}
|
|
||||||
// return 0;
|
|
||||||
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
// if (pTbCfgs->type == META_CHILD_TABLE) {
|
|
||||||
// char buf[8] = {0};
|
|
||||||
// int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
|
|
||||||
// sprintf(buf, "%d", colId); // colname
|
|
||||||
|
|
||||||
// char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
|
|
||||||
|
|
||||||
// tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id
|
|
||||||
// tb_uid_t tuid = 0; // child table uid
|
|
||||||
// SIndexMultiTerm *terms = indexMultiTermCreate();
|
|
||||||
// SIndexTerm *term =
|
|
||||||
// indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid);
|
|
||||||
// indexMultiTermAdd(terms, term);
|
|
||||||
|
|
||||||
// int ret = indexPut(pMeta->pIdx->pIdx, terms);
|
|
||||||
// indexMultiTermDestroy(terms);
|
|
||||||
// return ret;
|
|
||||||
//} else {
|
|
||||||
// return DB_DONOTINDEX;
|
|
||||||
//}
|
|
||||||
#endif
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
|
|
||||||
#endif
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
|
@ -39,13 +39,14 @@ static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&p
|
||||||
|
|
||||||
static void metaCleanup(SMeta **ppMeta);
|
static void metaCleanup(SMeta **ppMeta);
|
||||||
|
|
||||||
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
SMeta *pMeta = NULL;
|
SMeta *pMeta = NULL;
|
||||||
int ret;
|
int32_t code = 0;
|
||||||
int offset;
|
int32_t lino;
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
int32_t offset;
|
||||||
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
*ppMeta = NULL;
|
char indexFullPath[128] = {0};
|
||||||
|
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
|
||||||
|
|
||||||
// create handle
|
// create handle
|
||||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
||||||
|
@ -53,8 +54,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);
|
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);
|
||||||
|
|
||||||
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + strlen(path) + 1)) == NULL) {
|
if ((pMeta = taosMemoryCalloc(1, sizeof(*pMeta) + strlen(path) + 1)) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metaInitLock(pMeta);
|
metaInitLock(pMeta);
|
||||||
|
@ -69,163 +69,103 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
taosMkDir(pMeta->path);
|
taosMkDir(pMeta->path);
|
||||||
|
|
||||||
// open env
|
// open env
|
||||||
ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback,
|
code = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback,
|
||||||
pVnode->config.tdbEncryptAlgorithm, pVnode->config.tdbEncryptKey);
|
pVnode->config.tdbEncryptAlgorithm, pVnode->config.tdbEncryptKey);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pTbDb
|
// open pTbDb
|
||||||
ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb, 0);
|
code = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pSkmDb
|
// open pSkmDb
|
||||||
ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb, 0);
|
code = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pUidIdx
|
// open pUidIdx
|
||||||
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx, 0);
|
code = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pNameIdx
|
// open pNameIdx
|
||||||
ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx, 0);
|
code = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pCtbIdx
|
// open pCtbIdx
|
||||||
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx, 0);
|
code = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pSuidIdx
|
// open pSuidIdx
|
||||||
ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx, 0);
|
code = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
char indexFullPath[128] = {0};
|
TAOS_UNUSED(taosMkDir(indexFullPath));
|
||||||
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
|
|
||||||
taosMkDir(indexFullPath);
|
|
||||||
|
|
||||||
SIndexOpts opts = {.cacheSize = 8 * 1024 * 1024};
|
SIndexOpts opts = {.cacheSize = 8 * 1024 * 1024};
|
||||||
ret = indexOpen(&opts, indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);
|
code = indexOpen(&opts, indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx, 0);
|
code = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pTtlMgr ("ttlv1.idx")
|
// open pTtlMgr ("ttlv1.idx")
|
||||||
char logPrefix[128] = {0};
|
char logPrefix[128] = {0};
|
||||||
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
|
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
|
||||||
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix, tsTtlFlushThreshold);
|
code = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix, tsTtlFlushThreshold);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open pSmaIdx
|
// open pSmaIdx
|
||||||
ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx, 0);
|
code = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// idx table create time
|
// idx table create time
|
||||||
ret = tdbTbOpen("ctime.idx", sizeof(SBtimeIdxKey), 0, btimeIdxCmpr, pMeta->pEnv, &pMeta->pBtimeIdx, 0);
|
code = tdbTbOpen("ctime.idx", sizeof(SBtimeIdxKey), 0, btimeIdxCmpr, pMeta->pEnv, &pMeta->pBtimeIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta ctime index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// idx num of col, normal table only
|
// idx num of col, normal table only
|
||||||
ret = tdbTbOpen("ncol.idx", sizeof(SNcolIdxKey), 0, ncolIdxCmpr, pMeta->pEnv, &pMeta->pNcolIdx, 0);
|
code = tdbTbOpen("ncol.idx", sizeof(SNcolIdxKey), 0, ncolIdxCmpr, pMeta->pEnv, &pMeta->pNcolIdx, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta ncol index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0);
|
code = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0);
|
||||||
if (ret < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// open index
|
code = metaCacheOpen(pMeta);
|
||||||
if (metaOpenIdx(pMeta) < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = metaCacheOpen(pMeta);
|
code = metaInitTbFilterCache(pMeta);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
terrno = code;
|
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
||||||
metaError("vgId:%d, failed to open meta cache since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaCleanup(&pMeta);
|
||||||
goto _err;
|
*ppMeta = NULL;
|
||||||
|
} else {
|
||||||
|
metaDebug("vgId:%d %s success", TD_VID(pVnode), __func__);
|
||||||
|
*ppMeta = pMeta;
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
if (metaInitTbFilterCache(pMeta) != 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
|
|
||||||
|
|
||||||
*ppMeta = pMeta;
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
metaCleanup(&pMeta);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
|
int32_t metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
int code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SMeta *pMeta = *ppMeta;
|
int32_t lino;
|
||||||
|
SMeta *pMeta = *ppMeta;
|
||||||
|
|
||||||
if (ttlMgrNeedUpgrade(pMeta->pEnv)) {
|
if (ttlMgrNeedUpgrade(pMeta->pEnv)) {
|
||||||
code = metaBegin(pMeta, META_BEGIN_HEAP_OS);
|
code = metaBegin(pMeta, META_BEGIN_HEAP_OS);
|
||||||
if (code < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta);
|
code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta);
|
||||||
if (code < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = metaCommit(pMeta, pMeta->txn);
|
code = metaCommit(pMeta, pMeta->txn);
|
||||||
if (code < 0) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
metaError("vgId:%d %s failed at %s:%d since %s", TD_VID(pVnode), __func__, __FILE__, __LINE__, tstrerror(code));
|
||||||
metaCleanup(ppMeta);
|
metaCleanup(ppMeta);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,33 +175,42 @@ int metaClose(SMeta **ppMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaAlterCache(SMeta *pMeta, int32_t nPage) {
|
int metaAlterCache(SMeta *pMeta, int32_t nPage) {
|
||||||
|
int32_t code = 0;
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
|
code = tdbAlter(pMeta->pEnv, nPage);
|
||||||
if (tdbAlter(pMeta->pEnv, nPage) < 0) {
|
|
||||||
metaULock(pMeta);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
return 0;
|
|
||||||
|
if (code) {
|
||||||
|
metaError("vgId:%d %s failed since %s", TD_VID(pMeta->pVnode), __func__, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaRLock(SMeta *pMeta) {
|
int32_t metaRLock(SMeta *pMeta) {
|
||||||
metaTrace("meta rlock %p", &pMeta->lock);
|
metaTrace("meta rlock %p", &pMeta->lock);
|
||||||
int32_t ret = taosThreadRwlockRdlock(&pMeta->lock);
|
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
|
||||||
return ret;
|
if (code) {
|
||||||
|
return TAOS_SYSTEM_ERROR(code);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaWLock(SMeta *pMeta) {
|
int32_t metaWLock(SMeta *pMeta) {
|
||||||
metaTrace("meta wlock %p", &pMeta->lock);
|
metaTrace("meta wlock %p", &pMeta->lock);
|
||||||
int32_t ret = taosThreadRwlockWrlock(&pMeta->lock);
|
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
||||||
return ret;
|
if (code) {
|
||||||
|
return TAOS_SYSTEM_ERROR(code);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaULock(SMeta *pMeta) {
|
int32_t metaULock(SMeta *pMeta) {
|
||||||
metaTrace("meta ulock %p", &pMeta->lock);
|
metaTrace("meta ulock %p", &pMeta->lock);
|
||||||
int32_t ret = taosThreadRwlockUnlock(&pMeta->lock);
|
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||||
return ret;
|
if (code) {
|
||||||
|
return TAOS_SYSTEM_ERROR(code);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void metaCleanup(SMeta **ppMeta) {
|
static void metaCleanup(SMeta **ppMeta) {
|
||||||
|
|
|
@ -25,14 +25,14 @@ struct SMetaSnapReader {
|
||||||
|
|
||||||
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
|
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
SMetaSnapReader* pReader = NULL;
|
SMetaSnapReader* pReader = NULL;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
if (pReader == NULL) {
|
if (pReader == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
pReader->pMeta = pMeta;
|
pReader->pMeta = pMeta;
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
|
@ -40,36 +40,29 @@ int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapRe
|
||||||
|
|
||||||
// impl
|
// impl
|
||||||
code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
|
code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
taosMemoryFree(pReader);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
|
code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
taosMemoryFree(pReader);
|
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
goto _err;
|
metaSnapReaderClose(&pReader);
|
||||||
|
*ppReader = NULL;
|
||||||
|
} else {
|
||||||
|
metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
|
||||||
|
*ppReader = pReader;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaInfo("vgId:%d, vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
|
|
||||||
|
|
||||||
*ppReader = pReader;
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
metaError("vgId:%d, vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
|
||||||
*ppReader = NULL;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
|
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
if (ppReader && *ppReader) {
|
||||||
|
tdbTbcClose((*ppReader)->pTbc);
|
||||||
tdbTbcClose((*ppReader)->pTbc);
|
taosMemoryFree(*ppReader);
|
||||||
taosMemoryFree(*ppReader);
|
*ppReader = NULL;
|
||||||
*ppReader = NULL;
|
}
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
@ -106,7 +99,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
||||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
|
||||||
if (*ppData == NULL) {
|
if (*ppData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
|
@ -122,10 +115,10 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
if (code) {
|
||||||
|
metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
|
||||||
_err:
|
tstrerror(code));
|
||||||
metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,26 +131,30 @@ struct SMetaSnapWriter {
|
||||||
|
|
||||||
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
|
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
SMetaSnapWriter* pWriter;
|
SMetaSnapWriter* pWriter;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
if (pWriter == NULL) {
|
if (pWriter == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
pWriter->pMeta = pMeta;
|
pWriter->pMeta = pMeta;
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
metaBegin(pMeta, META_BEGIN_HEAP_NIL);
|
code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
_exit:
|
||||||
return code;
|
if (code) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
_err:
|
taosMemoryFree(pWriter);
|
||||||
metaError("vgId:%d, meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
|
*ppWriter = NULL;
|
||||||
*ppWriter = NULL;
|
} else {
|
||||||
|
metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
|
||||||
|
*ppWriter = pWriter;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,25 +186,24 @@ _err:
|
||||||
|
|
||||||
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t line = 0;
|
int32_t lino = 0;
|
||||||
SMeta* pMeta = pWriter->pMeta;
|
SMeta* pMeta = pWriter->pMeta;
|
||||||
SMetaEntry metaEntry = {0};
|
SMetaEntry metaEntry = {0};
|
||||||
SDecoder* pDecoder = &(SDecoder){0};
|
SDecoder* pDecoder = &(SDecoder){0};
|
||||||
|
|
||||||
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
code = metaDecodeEntry(pDecoder, &metaEntry);
|
code = metaDecodeEntry(pDecoder, &metaEntry);
|
||||||
VND_CHECK_CODE(code, line, _err);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = metaHandleEntry(pMeta, &metaEntry);
|
code = metaHandleEntry(pMeta, &metaEntry);
|
||||||
VND_CHECK_CODE(code, line, _err);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
tDecoderClear(pDecoder);
|
tDecoderClear(pDecoder);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
|
||||||
tDecoderClear(pDecoder);
|
|
||||||
metaError("vgId:%d, vnode snapshot meta write failed since %s at line:%d", TD_VID(pMeta->pVnode), terrstr(), line);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct STableInfoForChildTable {
|
typedef struct STableInfoForChildTable {
|
||||||
|
@ -468,19 +464,17 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid){
|
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSchemaWrapper *schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
|
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
|
||||||
if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY){
|
if (schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
tDeleteSchemaWrapper(schema);
|
tDeleteSchemaWrapper(schema);
|
||||||
ctx->hasPrimaryKey = ret;
|
ctx->hasPrimaryKey = ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool taosXGetTablePrimaryKey(SSnapContext* ctx){
|
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
|
||||||
return ctx->hasPrimaryKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
|
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
|
@ -312,8 +312,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
pReader->metaDone = 1;
|
pReader->metaDone = 1;
|
||||||
code = metaSnapReaderClose(&pReader->pMetaReader);
|
metaSnapReaderClose(&pReader->pMetaReader);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue