Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-31890-17

This commit is contained in:
Hongze Cheng 2024-09-24 16:40:48 +08:00
commit 85e3b26a4d
17 changed files with 307 additions and 211 deletions

View File

@ -268,7 +268,7 @@ typedef struct {
uint8_t lvl[3]; // l[0] = 'low', l[1] = 'mid', l[2] = 'high' uint8_t lvl[3]; // l[0] = 'low', l[1] = 'mid', l[2] = 'high'
} TCmprLvlSet; } TCmprLvlSet;
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level); void tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level);
#define DEFINE_VAR(cmprAlg) \ #define DEFINE_VAR(cmprAlg) \
uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \ uint8_t l1 = COMPRESS_L1_TYPE_U32(cmprAlg); \

View File

@ -495,7 +495,7 @@ static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInv
return terrno; return terrno;
} }
} else { } else {
(void)taosLRUCacheRelease(pCache, pRes, false); bool ret = taosLRUCacheRelease(pCache, pRes, false);
} }
} }
@ -562,7 +562,7 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
((double)(*pEntry)->hitTimes) / acc); ((double)(*pEntry)->hitTimes) / acc);
} }
(void)taosLRUCacheRelease(pCache, pHandle, false); bool ret = taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta // unlock meta
(void)taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
@ -618,7 +618,7 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL
p->hitTimes = 0; p->hitTimes = 0;
tdListInit(&p->list, keyLen); tdListInit(&p->list, keyLen);
TAOS_CHECK_RETURN(taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES)); TAOS_CHECK_RETURN(taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES));
(void)tdListAppend(&p->list, pKey); TAOS_CHECK_RETURN(tdListAppend(&p->list, pKey));
return 0; return 0;
} }
@ -662,7 +662,10 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
} else { // check if it exists or not } else { // check if it exists or not
size_t size = listNEles(&(*pEntry)->list); size_t size = listNEles(&(*pEntry)->list);
if (size == 0) { if (size == 0) {
(void)tdListAppend(&(*pEntry)->list, pKey); code = tdListAppend(&(*pEntry)->list, pKey);
if (code) {
goto _end;
}
} else { } else {
SListNode* pNode = listHead(&(*pEntry)->list); SListNode* pNode = listHead(&(*pEntry)->list);
uint64_t* p = (uint64_t*)pNode->data; uint64_t* p = (uint64_t*)pNode->data;
@ -671,7 +674,10 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
(void)taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // not equal, append it } else { // not equal, append it
(void)tdListAppend(&(*pEntry)->list, pKey); code = tdListAppend(&(*pEntry)->list, pKey);
if (code) {
goto _end;
}
} }
} }
} }
@ -761,7 +767,7 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
((double)(*pEntry)->hitTimes) / acc); ((double)(*pEntry)->hitTimes) / acc);
} }
(void)taosLRUCacheRelease(pCache, pHandle, false); bool ret = taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta // unlock meta
(void)taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
@ -839,7 +845,10 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
} else { // check if it exists or not } else { // check if it exists or not
size_t size = listNEles(&(*pEntry)->list); size_t size = listNEles(&(*pEntry)->list);
if (size == 0) { if (size == 0) {
(void)tdListAppend(&(*pEntry)->list, pKey); code = tdListAppend(&(*pEntry)->list, pKey);
if (code) {
goto _end;
}
} else { } else {
SListNode* pNode = listHead(&(*pEntry)->list); SListNode* pNode = listHead(&(*pEntry)->list);
uint64_t* p = (uint64_t*)pNode->data; uint64_t* p = (uint64_t*)pNode->data;
@ -848,7 +857,10 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
(void)taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // not equal, append it } else { // not equal, append it
(void)tdListAppend(&(*pEntry)->list, pKey); code = tdListAppend(&(*pEntry)->list, pKey);
if (code) {
goto _end;
}
} }
} }
} }

View File

@ -66,7 +66,10 @@ int metaPrepareAsyncCommit(SMeta *pMeta) {
int32_t lino; int32_t lino;
metaWLock(pMeta); metaWLock(pMeta);
TAOS_UNUSED(ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn)); int32_t ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret < 0) {
metaError("vgId:%d, failed to flush ttl since %s", TD_VID(pMeta->pVnode), tstrerror(ret));
}
metaULock(pMeta); metaULock(pMeta);
code = tdbCommit(pMeta->pEnv, pMeta->txn); code = tdbCommit(pMeta->pEnv, pMeta->txn);

View File

@ -65,7 +65,8 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
pMeta->pVnode = pVnode; pMeta->pVnode = pVnode;
// create path if not created yet // create path if not created yet
(void)taosMkDir(pMeta->path); code = taosMkDir(pMeta->path);
TSDB_CHECK_CODE(code, lino, _exit);
// open env // open env
code = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback, code = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback,

View File

@ -87,7 +87,9 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
if (key.version < pReader->sver // if (key.version < pReader->sver //
|| metaGetInfo(pReader->pMeta, key.uid, &info, NULL) == TSDB_CODE_NOT_FOUND) { || metaGetInfo(pReader->pMeta, key.uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
(void)tdbTbcMoveToNext(pReader->pTbc); if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
}
continue; continue;
} }
@ -110,7 +112,9 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d", metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData); TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
(void)tdbTbcMoveToNext(pReader->pTbc); if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
}
break; break;
} }
@ -233,7 +237,9 @@ static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
return TAOS_GET_TERRNO(code); return TAOS_GET_TERRNO(code);
} }
if (c < 0) { if (c < 0) {
(void)tdbTbcMoveToPrev((TBC*)ctx->pCur); if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
}
} }
return 0; return 0;
} }

View File

@ -22,7 +22,7 @@ static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, con
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME); static void metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs); static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -1441,8 +1441,8 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
return 0; return 0;
} }
static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) { static void metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return;
STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn}; STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
if (pME->type == TSDB_CHILD_TABLE) { if (pME->type == TSDB_CHILD_TABLE) {
@ -1451,7 +1451,12 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
ctx.ttlDays = pME->ntbEntry.ttlDays; ctx.ttlDays = pME->ntbEntry.ttlDays;
} }
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); int32_t ret = ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
if (ret < 0) {
metaError("vgId:%d, failed to delete ttl for table:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pME->name,
pME->uid, tstrerror(ret));
}
return;
} }
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl) { static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl) {
@ -1831,12 +1836,19 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId; int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId;
int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type; int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type;
(void)tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); int32_t ret = tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
if (ret < 0) {
terrno = ret;
goto _err;
}
} }
SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1]; SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1];
uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type) uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type)
: pAlterTbReq->compress; : pAlterTbReq->compress;
(void)updataTableColCmpr(&entry.colCmpr, pCol, 1, compress); if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) {
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
entry.uid);
}
freeColCmpr = true; freeColCmpr = true;
if (entry.colCmpr.nCols != pSchema->nCols) { if (entry.colCmpr.nCols != pSchema->nCols) {
if (pNewSchema) taosMemoryFree(pNewSchema); if (pNewSchema) taosMemoryFree(pNewSchema);
@ -1876,10 +1888,16 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int16_t cid = pColumn->colId; int16_t cid = pColumn->colId;
(void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey); if (tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey) != 0) {
metaError("vgId:%d, failed to drop ntable column:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
entry.uid);
}
} }
(void)updataTableColCmpr(&entry.colCmpr, &tScheam, 0, 0); if (updataTableColCmpr(&entry.colCmpr, &tScheam, 0, 0) != 0) {
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
entry.uid);
}
if (entry.colCmpr.nCols != pSchema->nCols) { if (entry.colCmpr.nCols != pSchema->nCols) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err; goto _err;
@ -1928,20 +1946,36 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// do actual write // do actual write
metaWLock(pMeta); metaWLock(pMeta);
(void)metaDeleteNcolIdx(pMeta, &oldEntry); if (metaDeleteNcolIdx(pMeta, &oldEntry) < 0) {
(void)metaUpdateNcolIdx(pMeta, &entry); metaError("vgId:%d, failed to delete ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
if (metaUpdateNcolIdx(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to update ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
// save to table db // save to table db
(void)metaSaveToTbDb(pMeta, &entry); if (metaSaveToTbDb(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to save to tb db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
(void)metaUpdateUidIdx(pMeta, &entry); if (metaUpdateUidIdx(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
(void)metaSaveToSkmDb(pMeta, &entry); if (metaSaveToSkmDb(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to save to skm db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
(void)metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs); if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) {
metaError("vgId:%d, failed to update change time:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
metaULock(pMeta); metaULock(pMeta);
(void)metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp); if (metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp) < 0) {
metaError("vgId:%d, failed to update meta rsp:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
for (int32_t i = 0; i < entry.colCmpr.nCols; i++) { for (int32_t i = 0; i < entry.colCmpr.nCols; i++) {
SColCmpr *p = &entry.colCmpr.pColCmpr[i]; SColCmpr *p = &entry.colCmpr.pColCmpr[i];
pMetaRsp->pSchemaExt[i].colId = p->id; pMetaRsp->pSchemaExt[i].colId = p->id;
@ -1997,14 +2031,18 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
TBC *pUidIdxc = NULL; TBC *pUidIdxc = NULL;
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL)); TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL));
(void)tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); if (tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c) < 0) {
metaTrace("meta/table: failed to move to uid index, uid:%" PRId64, uid);
}
if (c != 0) { if (c != 0) {
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c); metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c);
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
} }
(void)tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); if (tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData) != 0) {
metaError("meta/table: failed to get uid index, uid:%" PRId64, uid);
}
oversion = ((SUidIdxVal *)pData)[0].version; oversion = ((SUidIdxVal *)pData)[0].version;
// search table.db // search table.db
@ -2014,7 +2052,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
/* get ctbEntry */ /* get ctbEntry */
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL)); TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL));
(void)tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); if (tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c) != 0) {
metaError("meta/table: failed to move to tb db, uid:%" PRId64, uid);
}
if (c != 0) { if (c != 0) {
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
@ -2022,29 +2062,43 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST; return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
} }
(void)tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); if (tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData) != 0) {
metaError("meta/table: failed to get tb db, uid:%" PRId64, uid);
}
if ((ctbEntry.pBuf = taosMemoryMalloc(nData)) == NULL) { if ((ctbEntry.pBuf = taosMemoryMalloc(nData)) == NULL) {
(void)tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
(void)tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
return terrno; return terrno;
} }
memcpy(ctbEntry.pBuf, pData, nData); memcpy(ctbEntry.pBuf, pData, nData);
tDecoderInit(&dc1, ctbEntry.pBuf, nData); tDecoderInit(&dc1, ctbEntry.pBuf, nData);
(void)metaDecodeEntry(&dc1, &ctbEntry); ret = metaDecodeEntry(&dc1, &ctbEntry);
if (ret < 0) {
terrno = ret;
goto _err;
}
/* get stbEntry*/ /* get stbEntry*/
(void)tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal); if (tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal) != 0) {
metaError("meta/table: failed to get uid index, uid:%" PRId64, ctbEntry.ctbEntry.suid);
}
if (!pVal) { if (!pVal) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto _err; goto _err;
} }
(void)tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}), if (tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}),
sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal); sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal) != 0) {
metaError("meta/table: failed to get tb db, uid:%" PRId64, ctbEntry.ctbEntry.suid);
}
tdbFree(pVal); tdbFree(pVal);
tDecoderInit(&dc2, stbEntry.pBuf, nVal); tDecoderInit(&dc2, stbEntry.pBuf, nVal);
(void)metaDecodeEntry(&dc2, &stbEntry); ret = metaDecodeEntry(&dc2, &stbEntry);
if (ret < 0) {
terrno = ret;
goto _err;
}
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema *pColumn = NULL; SSchema *pColumn = NULL;
@ -2122,12 +2176,18 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaWLock(pMeta); metaWLock(pMeta);
// save to table.db // save to table.db
(void)metaSaveToTbDb(pMeta, &ctbEntry); if (metaSaveToTbDb(pMeta, &ctbEntry) < 0) {
metaError("meta/table: failed to save to tb db:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
// save to uid.idx // save to uid.idx
(void)metaUpdateUidIdx(pMeta, &ctbEntry); if (metaUpdateUidIdx(pMeta, &ctbEntry) < 0) {
metaError("meta/table: failed to update uid idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
(void)metaUpdateTagIdx(pMeta, &ctbEntry); if (metaUpdateTagIdx(pMeta, &ctbEntry) < 0) {
metaError("meta/table: failed to update tag idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
if (NULL == ctbEntry.ctbEntry.pTags) { if (NULL == ctbEntry.ctbEntry.pTags) {
metaError("meta/table: null tags, update tag val failed."); metaError("meta/table: null tags, update tag val failed.");
@ -2135,13 +2195,22 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
} }
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid}; SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
(void)tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags, if (tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn); ((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn) < 0) {
metaError("meta/table: failed to upsert ctb idx:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
(void)metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid); if (metaUidCacheClear(pMeta, ctbEntry.ctbEntry.suid) < 0) {
(void)metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid); metaError("meta/table: failed to clear uid cache:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
(void)metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs); if (metaTbGroupCacheClear(pMeta, ctbEntry.ctbEntry.suid) < 0) {
metaError("meta/table: failed to clear group cache:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
if (metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs) < 0) {
metaError("meta/table: failed to update change time:%s uid:%" PRId64, ctbEntry.name, ctbEntry.uid);
}
metaULock(pMeta); metaULock(pMeta);
@ -2189,21 +2258,27 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
TBC *pUidIdxc = NULL; TBC *pUidIdxc = NULL;
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL)); TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL));
(void)tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); if (tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c) < 0) {
metaError("meta/table: failed to move to uid index, uid:%" PRId64, uid);
}
if (c != 0) { if (c != 0) {
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
metaError("meta/table: invalide c: %" PRId32 " update tb options failed.", c); metaError("meta/table: invalide c: %" PRId32 " update tb options failed.", c);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
(void)tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); if (tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData) < 0) {
metaError("meta/table: failed to get uid index, uid:%" PRId64, uid);
}
oversion = ((SUidIdxVal *)pData)[0].version; oversion = ((SUidIdxVal *)pData)[0].version;
// search table.db // search table.db
TBC *pTbDbc = NULL; TBC *pTbDbc = NULL;
TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL)); TAOS_CHECK_RETURN(tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL));
(void)tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); if (tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c) < 0) {
metaError("meta/table: failed to move to tb db, uid:%" PRId64, uid);
}
if (c != 0) { if (c != 0) {
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
@ -2211,13 +2286,15 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
(void)tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); if (tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData) < 0) {
metaError("meta/table: failed to get tb db, uid:%" PRId64, uid);
}
// get table entry // get table entry
SDecoder dc = {0}; SDecoder dc = {0};
if ((entry.pBuf = taosMemoryMalloc(nData)) == NULL) { if ((entry.pBuf = taosMemoryMalloc(nData)) == NULL) {
(void)tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
(void)tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
return terrno; return terrno;
} }
memcpy(entry.pBuf, pData, nData); memcpy(entry.pBuf, pData, nData);
@ -2236,9 +2313,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// build SMetaEntry // build SMetaEntry
if (entry.type == TSDB_CHILD_TABLE) { if (entry.type == TSDB_CHILD_TABLE) {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
(void)metaDeleteTtl(pMeta, &entry); metaDeleteTtl(pMeta, &entry);
entry.ctbEntry.ttlDays = pAlterTbReq->newTTL; entry.ctbEntry.ttlDays = pAlterTbReq->newTTL;
(void)metaUpdateTtl(pMeta, &entry); metaUpdateTtl(pMeta, &entry);
} }
if (pAlterTbReq->newCommentLen >= 0) { if (pAlterTbReq->newCommentLen >= 0) {
entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen; entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen;
@ -2246,9 +2323,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
} }
} else { } else {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
(void)metaDeleteTtl(pMeta, &entry); metaDeleteTtl(pMeta, &entry);
entry.ntbEntry.ttlDays = pAlterTbReq->newTTL; entry.ntbEntry.ttlDays = pAlterTbReq->newTTL;
(void)metaUpdateTtl(pMeta, &entry); metaUpdateTtl(pMeta, &entry);
} }
if (pAlterTbReq->newCommentLen >= 0) { if (pAlterTbReq->newCommentLen >= 0) {
entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen; entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen;
@ -2257,9 +2334,17 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
} }
// save to table db // save to table db
(void)metaSaveToTbDb(pMeta, &entry); if (metaSaveToTbDb(pMeta, &entry) < 0) {
(void)metaUpdateUidIdx(pMeta, &entry); metaError("meta/table: failed to save to tb db:%s uid:%" PRId64, entry.name, entry.uid);
(void)metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs); }
if (metaUpdateUidIdx(pMeta, &entry) < 0) {
metaError("meta/table: failed to update uid idx:%s uid:%" PRId64, entry.name, entry.uid);
}
if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) {
metaError("meta/table: failed to update change time:%s uid:%" PRId64, entry.name, entry.uid);
}
metaULock(pMeta); metaULock(pMeta);
@ -2305,7 +2390,10 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
STbDbKey tbDbKey = {0}; STbDbKey tbDbKey = {0};
tbDbKey.uid = suid; tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version; tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
(void)tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal); ret = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
if (ret < 0) {
goto _err;
}
tDecoderInit(&dc, pVal, nVal); tDecoderInit(&dc, pVal, nVal);
ret = metaDecodeEntry(&dc, &stbEntry); ret = metaDecodeEntry(&dc, &stbEntry);
if (ret < 0) { if (ret < 0) {
@ -2384,7 +2472,10 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
goto _err; goto _err;
} }
(void)tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); ret = tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
if (ret < 0) {
metaError("meta/table: failed to upsert tag idx:%s uid:%" PRId64, stbEntry.name, stbEntry.uid);
}
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL; pTagIdxKey = NULL;
} }
@ -2439,7 +2530,10 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
STbDbKey tbDbKey = {0}; STbDbKey tbDbKey = {0};
tbDbKey.uid = suid; tbDbKey.uid = suid;
tbDbKey.version = ((SUidIdxVal *)pVal)[0].version; tbDbKey.version = ((SUidIdxVal *)pVal)[0].version;
(void)tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal); ret = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pVal, &nVal);
if (ret < 0) {
goto _err;
}
tDecoderInit(&dc, pVal, nVal); tDecoderInit(&dc, pVal, nVal);
ret = metaDecodeEntry(&dc, &stbEntry); ret = metaDecodeEntry(&dc, &stbEntry);
@ -2507,7 +2601,10 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
metaWLock(pMeta); metaWLock(pMeta);
for (int i = 0; i < taosArrayGetSize(tagIdxList); i++) { for (int i = 0; i < taosArrayGetSize(tagIdxList); i++) {
SMetaPair *pair = taosArrayGet(tagIdxList, i); SMetaPair *pair = taosArrayGet(tagIdxList, i);
(void)tdbTbDelete(pMeta->pTagIdx, pair->key, pair->nkey, pMeta->txn); ret = tdbTbDelete(pMeta->pTagIdx, pair->key, pair->nkey, pMeta->txn);
if (ret < 0) {
metaError("meta/table: failed to delete tag idx:%s uid:%" PRId64, stbEntry.name, stbEntry.uid);
}
} }
metaULock(pMeta); metaULock(pMeta);
@ -2594,9 +2691,17 @@ int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *
tbEntry.version = version; tbEntry.version = version;
metaWLock(pMeta); metaWLock(pMeta);
(void)metaSaveToTbDb(pMeta, &tbEntry); if (metaSaveToTbDb(pMeta, &tbEntry) < 0) {
(void)metaUpdateUidIdx(pMeta, &tbEntry); metaError("meta/table: failed to save to tb db:%s uid:%" PRId64, tbEntry.name, tbEntry.uid);
(void)metaUpdateChangeTime(pMeta, suid, pReq->ctimeMs); }
if (metaUpdateUidIdx(pMeta, &tbEntry) < 0) {
metaError("meta/table: failed to update uid idx:%s uid:%" PRId64, tbEntry.name, tbEntry.uid);
}
if (metaUpdateChangeTime(pMeta, suid, pReq->ctimeMs) < 0) {
metaError("meta/table: failed to update change time:%s uid:%" PRId64, tbEntry.name, tbEntry.uid);
}
metaULock(pMeta); metaULock(pMeta);
@ -2691,7 +2796,10 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
// upsert cache // upsert cache
SMetaInfo info; SMetaInfo info;
metaGetEntryInfo(pME, &info); metaGetEntryInfo(pME, &info);
(void)metaCacheUpsert(pMeta, &info); int32_t ret = metaCacheUpsert(pMeta, &info);
if (ret < 0) {
metaError("vgId:%d, failed to upsert cache, uid: %" PRId64 " %s", TD_VID(pMeta->pVnode), pME->uid, tstrerror(ret));
}
SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer}; SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer};
@ -2706,8 +2814,8 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbUpsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn); return tdbTbUpsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn);
} }
static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) { static void metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return;
STtlUpdTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn}; STtlUpdTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
if (pME->type == TSDB_CHILD_TABLE) { if (pME->type == TSDB_CHILD_TABLE) {
@ -2718,7 +2826,12 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
ctx.changeTimeMs = pME->ntbEntry.btime; ctx.changeTimeMs = pME->ntbEntry.btime;
} }
return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx); int32_t ret = ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx);
if (ret < 0) {
metaError("vgId:%d, failed to insert ttl, uid: %" PRId64 " %s", TD_VID(pMeta->pVnode), pME->uid, tstrerror(ret));
}
return;
} }
static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) { static int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
@ -2806,7 +2919,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
} }
tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = ((SUidIdxVal *)pData)[0].version; tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
(void)tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData); ret = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
if (ret < 0) {
metaError("vgId:%d, failed to get stable for update. version:%" PRId64, TD_VID(pMeta->pVnode), pCtbEntry->version);
goto end;
}
tDecoderInit(&dc, pData, nData); tDecoderInit(&dc, pData, nData);
ret = metaDecodeEntry(&dc, &stbEntry); ret = metaDecodeEntry(&dc, &stbEntry);
@ -2854,7 +2971,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
ret = -1; ret = -1;
goto end; goto end;
} }
(void)tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); if (tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn) < 0) {
metaError("vgId:%d, failed to update tag index. version:%" PRId64, TD_VID(pMeta->pVnode), pCtbEntry->version);
}
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL; pTagIdxKey = NULL;
} }
@ -2905,7 +3024,11 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
} }
tEncoderInit(&coder, pVal, vLen); tEncoderInit(&coder, pVal, vLen);
(void)tEncodeSSchemaWrapper(&coder, pSW); ret = tEncodeSSchemaWrapper(&coder, pSW);
if (ret < 0) {
rcode = -1;
goto _exit;
}
if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, pMeta->txn) < 0) { if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, pMeta->txn) < 0) {
rcode = -1; rcode = -1;
@ -2966,8 +3089,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
} }
if (pME->type != TSDB_SUPER_TABLE) { if (pME->type != TSDB_SUPER_TABLE) {
code = metaUpdateTtl(pMeta, pME); metaUpdateTtl(pMeta, pME);
VND_CHECK_CODE(code, line, _err);
} }
if (pME->type == TSDB_SUPER_TABLE || pME->type == TSDB_NORMAL_TABLE) { if (pME->type == TSDB_SUPER_TABLE || pME->type == TSDB_NORMAL_TABLE) {
@ -2985,7 +3107,7 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t colCompressDebug(SHashObj *pColCmprObj) { static void colCompressDebug(SHashObj *pColCmprObj) {
void *p = taosHashIterate(pColCmprObj, NULL); void *p = taosHashIterate(pColCmprObj, NULL);
while (p) { while (p) {
uint32_t cmprAlg = *(uint32_t *)p; uint32_t cmprAlg = *(uint32_t *)p;
@ -2993,14 +3115,14 @@ int32_t colCompressDebug(SHashObj *pColCmprObj) {
p = taosHashIterate(pColCmprObj, p); p = taosHashIterate(pColCmprObj, p);
uint8_t l1, l2, lvl; uint8_t l1, l2, lvl;
(void)tcompressDebug(cmprAlg, &l1, &l2, &lvl); tcompressDebug(cmprAlg, &l1, &l2, &lvl);
const char *l1str = columnEncodeStr(l1); const char *l1str = columnEncodeStr(l1);
const char *l2str = columnCompressStr(l2); const char *l2str = columnCompressStr(l2);
const char *lvlstr = columnLevelStr(lvl); const char *lvlstr = columnLevelStr(lvl);
metaDebug("colId: %d, encode:%s, compress:%s,level:%s", colId, l1str, l2str, lvlstr); metaDebug("colId: %d, encode:%s, compress:%s,level:%s", colId, l1str, l2str, lvlstr);
} }
return 0; return;
} }
int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) { int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) {
int rc = 0; int rc = 0;
@ -3063,7 +3185,7 @@ int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) {
metaULock(pMeta); metaULock(pMeta);
*ppColCmprObj = pColCmprObj; *ppColCmprObj = pColCmprObj;
(void)colCompressDebug(pColCmprObj); colCompressDebug(pColCmprObj);
return 0; return 0;
} }

View File

@ -144,7 +144,7 @@ static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosMemoryFree(pTtlMgr->logPrefix); taosMemoryFree(pTtlMgr->logPrefix);
taosHashCleanup(pTtlMgr->pTtlCache); taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids); taosHashCleanup(pTtlMgr->pDirtyUids);
(void)tdbTbClose(pTtlMgr->pTtlIdx); tdbTbClose(pTtlMgr->pTtlIdx);
taosMemoryFree(pTtlMgr); taosMemoryFree(pTtlMgr);
} }
@ -302,7 +302,10 @@ int32_t ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
} }
if (ttlMgrNeedFlush(pTtlMgr)) { if (ttlMgrNeedFlush(pTtlMgr)) {
(void)ttlMgrFlush(pTtlMgr, updCtx->pTxn); int32_t ret = ttlMgrFlush(pTtlMgr, updCtx->pTxn);
if (ret < 0) {
metaError("%s, ttlMgr insert failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
}
} }
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
@ -326,7 +329,10 @@ int32_t ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
} }
if (ttlMgrNeedFlush(pTtlMgr)) { if (ttlMgrNeedFlush(pTtlMgr)) {
(void)ttlMgrFlush(pTtlMgr, delCtx->pTxn); int32_t ret = ttlMgrFlush(pTtlMgr, delCtx->pTxn);
if (ret < 0) {
metaError("%s, ttlMgr del failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
}
} }
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
@ -350,7 +356,8 @@ int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdC
.changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs}; .changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT}; STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
code = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry)); code =
taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code)); metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
goto _out; goto _out;
@ -359,13 +366,15 @@ int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdC
code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry, code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
sizeof(dirtryEntry)); sizeof(dirtryEntry));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix, metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
tstrerror(code));
goto _out; goto _out;
} }
if (ttlMgrNeedFlush(pTtlMgr)) { if (ttlMgrNeedFlush(pTtlMgr)) {
(void)ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn); int32_t ret = ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
if (ret < 0) {
metaError("%s, ttlMgr update ctime failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
}
} }
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
@ -420,7 +429,7 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (cacheEntry == NULL) { if (cacheEntry == NULL) {
metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid, metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
pEntry->type); pEntry->type);
continue; continue;
} }

View File

@ -127,7 +127,6 @@ int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
goto END; goto END;
} }
buf = taosMemoryCalloc(1, vlen); buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) { if (buf == NULL) {
code = terrno; code = terrno;
@ -152,7 +151,8 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL; TXN* txn = NULL;
TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(
tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
@ -168,7 +168,8 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL; TXN* txn = NULL;
TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(
tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn)); TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
@ -180,7 +181,7 @@ END:
return code; return code;
} }
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset){ int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if (data == NULL) { if (data == NULL) {
int vLen = 0; int vLen = 0;
@ -203,7 +204,7 @@ int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset){
tdbFree(data); tdbFree(data);
*pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if(*pOffset == NULL){ if (*pOffset == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} else { } else {
@ -266,8 +267,8 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
initStorageAPI(&reader.api); initStorageAPI(&reader.api);
if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle->execHandle.task = handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); &handle->execHandle.numOfCols, handle->consumerId);
TQ_NULL_GO_TO_END(handle->execHandle.task); TQ_NULL_GO_TO_END(handle->execHandle.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle->execHandle.task, &scanner); qExtractStreamScanner(handle->execHandle.task, &scanner);
@ -280,20 +281,21 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
handle->execHandle.pTqReader = tqReaderOpen(pVnode); handle->execHandle.pTqReader = tqReaderOpen(pVnode);
TQ_NULL_GO_TO_END(handle->execHandle.pTqReader); TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext))); (SSnapContext**)(&reader.sContext)));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
TQ_NULL_GO_TO_END(handle->execHandle.task); TQ_NULL_GO_TO_END(handle->execHandle.task);
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
TQ_NULL_GO_TO_END(handle->pWalReader); TQ_NULL_GO_TO_END(handle->pWalReader);
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr()); tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return TSDB_CODE_SCH_INTERNAL_ERROR; return TSDB_CODE_SCH_INTERNAL_ERROR;
} }
} }
TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType, TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
handle->fetchMeta, (SSnapContext**)(&reader.sContext))); handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext)));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
TQ_NULL_GO_TO_END(handle->execHandle.task); TQ_NULL_GO_TO_END(handle->execHandle.task);
SArray* tbUidList = NULL; SArray* tbUidList = NULL;
@ -341,7 +343,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
handle->execHandle.subType = req->subType; handle->execHandle.subType = req->subType;
handle->fetchMeta = req->withMeta; handle->fetchMeta = req->withMeta;
if (req->subType == TOPIC_SUB_TYPE__COLUMN) { if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
void *tmp = taosStrdup(req->qmsg); void* tmp = taosStrdup(req->qmsg);
if (tmp == NULL) { if (tmp == NULL) {
return terrno; return terrno;
} }
@ -349,12 +351,12 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
} else if (req->subType == TOPIC_SUB_TYPE__DB) { } else if (req->subType == TOPIC_SUB_TYPE__DB) {
handle->execHandle.execDb.pFilterOutTbUid = handle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if(handle->execHandle.execDb.pFilterOutTbUid == NULL){ if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
return terrno; return terrno;
} }
}else if(req->subType == TOPIC_SUB_TYPE__TABLE){ } else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
handle->execHandle.execTb.suid = req->suid; handle->execHandle.execTb.suid = req->suid;
void *tmp = taosStrdup(req->qmsg); void* tmp = taosStrdup(req->qmsg);
if (tmp == NULL) { if (tmp == NULL) {
return terrno; return terrno;
} }
@ -364,7 +366,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
int32_t code = tqMetaInitHandle(pTq, handle); int32_t code = tqMetaInitHandle(pTq, handle);
if (code != 0){ if (code != 0) {
return code; return code;
} }
tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
@ -437,10 +439,10 @@ END:
return code; return code;
} }
static int32_t replaceTqPath(char** path){ static int32_t replaceTqPath(char** path) {
char* tpath = NULL; char* tpath = NULL;
int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME); int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
if (code != 0){ if (code != 0) {
return code; return code;
} }
taosMemoryFree(*path); taosMemoryFree(*path);
@ -475,7 +477,7 @@ END:
} }
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL; char* maindb = NULL;
char* offsetNew = NULL; char* offsetNew = NULL;
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
@ -488,7 +490,7 @@ int32_t tqMetaOpen(STQ* pTq) {
} }
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offsetNew)){ if (taosCheckExistFile(offsetNew)) {
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew)); TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew)); TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
} }
@ -522,7 +524,7 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset)) { if (taosCheckExistFile(offset)) {
if (taosCopyFile(offset, offsetNew) < 0) { if (taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error"); tqError("copy offset file error");
} else { } else {
@ -534,44 +536,22 @@ END:
taosMemoryFree(offset); taosMemoryFree(offset);
taosMemoryFree(offsetNew); taosMemoryFree(offsetNew);
int32_t ret = tdbTbClose(pExecStore); tdbTbClose(pExecStore);
if (ret != 0) { tdbTbClose(pCheckStore);
tqError("failed to close tb, ret:%d", ret); tdbClose(pMetaDB);
}
ret = tdbTbClose(pCheckStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
ret = tdbClose(pMetaDB);
if (ret != 0) {
tqError("failed to close tdb, ret:%d", ret);
}
return code; return code;
} }
void tqMetaClose(STQ* pTq) { void tqMetaClose(STQ* pTq) {
int32_t ret = 0; int32_t ret = 0;
if (pTq->pExecStore) { if (pTq->pExecStore) {
ret = tdbTbClose(pTq->pExecStore); tdbTbClose(pTq->pExecStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
} }
if (pTq->pCheckStore) { if (pTq->pCheckStore) {
ret = tdbTbClose(pTq->pCheckStore); tdbTbClose(pTq->pCheckStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
} }
if (pTq->pOffsetStore) { if (pTq->pOffsetStore) {
ret = tdbTbClose(pTq->pOffsetStore); tdbTbClose(pTq->pOffsetStore);
if (ret != 0) {
tqError("failed to close tb, ret:%d", ret);
}
}
ret = tdbClose(pTq->pMetaDB);
if (ret != 0) {
tqError("failed to close tdb, ret:%d", ret);
} }
tdbClose(pTq->pMetaDB);
} }

View File

@ -476,23 +476,14 @@ _err:
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap); if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) { if (pMeta->pTaskDb) {
int32_t ret = tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
if (ret) {
stError("vgId:%d tdb failed close task db, code:%s", pMeta->vgId, tstrerror(ret));
}
pMeta->pTaskDb = NULL; pMeta->pTaskDb = NULL;
} }
if (pMeta->pCheckpointDb) { if (pMeta->pCheckpointDb) {
int32_t ret = tdbTbClose(pMeta->pCheckpointDb); tdbTbClose(pMeta->pCheckpointDb);
if (ret) {
stError("vgId:%d tdb failed close task checkpointDb, code:%s", pMeta->vgId, tstrerror(ret));
}
} }
if (pMeta->db) { if (pMeta->db) {
int32_t ret = tdbClose(pMeta->db); tdbClose(pMeta->db);
if (ret) {
stError("vgId:%d tdb failed close meta db, code:%s", pMeta->vgId, tstrerror(ret));
}
} }
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
@ -598,18 +589,9 @@ void streamMetaCloseImpl(void* arg) {
// already log the error, ignore here // already log the error, ignore here
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);
code = tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
if (code) { tdbTbClose(pMeta->pCheckpointDb);
stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code)); tdbClose(pMeta->db);
}
code = tdbTbClose(pMeta->pCheckpointDb);
if (code) {
stError("vgId:%d failed to close checkpointDb, code:%s", vgId, tstrerror(code));
}
code = tdbClose(pMeta->db);
if (code) {
stError("vgId:%d failed to close db, code:%s", vgId, tstrerror(code));
}
taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->pTaskList);
taosArrayDestroy(pMeta->chkpSaved); taosArrayDestroy(pMeta->chkpSaved);

View File

@ -34,7 +34,7 @@ typedef struct STxn TXN;
// TDB // TDB
int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback, int32_t encryptAlgorithm, int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback, int32_t encryptAlgorithm,
char *encryptKey); char *encryptKey);
int32_t tdbClose(TDB *pDb); void tdbClose(TDB *pDb);
int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags); int flags);
int32_t tdbCommit(TDB *pDb, TXN *pTxn); int32_t tdbCommit(TDB *pDb, TXN *pTxn);
@ -46,7 +46,7 @@ int32_t tdbAlter(TDB *pDb, int pages);
// TTB // TTB
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb, int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb,
int8_t rollback); int8_t rollback);
int32_t tdbTbClose(TTB *pTb); void tdbTbClose(TTB *pTb);
bool tdbTbExist(const char *tbname, TDB *pEnv); bool tdbTbExist(const char *tbname, TDB *pEnv);
int tdbTbDropByName(const char *tbname, TDB *pEnv, TXN *pTxn); int tdbTbDropByName(const char *tbname, TDB *pEnv, TXN *pTxn);
int32_t tdbTbDrop(TTB *pTb); int32_t tdbTbDrop(TTB *pTb);

View File

@ -90,7 +90,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
return 0; return 0;
} }
int tdbClose(TDB *pDb) { void tdbClose(TDB *pDb) {
SPager *pPager; SPager *pPager;
if (pDb) { if (pDb) {
@ -109,7 +109,7 @@ int tdbClose(TDB *pDb) {
tdbOsFree(pDb); tdbOsFree(pDb);
} }
return 0; return;
} }
int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); } int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); }

View File

@ -130,12 +130,12 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
return 0; return 0;
} }
int tdbTbClose(TTB *pTb) { void tdbTbClose(TTB *pTb) {
if (pTb) { if (pTb) {
tdbBtreeClose(pTb->pBt); tdbBtreeClose(pTb->pBt);
tdbOsFree(pTb); tdbOsFree(pTb);
} }
return 0; return;
} }
bool tdbTbExist(const char *tbname, TDB *pEnv) { bool tdbTbExist(const char *tbname, TDB *pEnv) {

View File

@ -197,8 +197,7 @@ static void insertOfp(void) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) { // TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
@ -247,8 +246,7 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) { // TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
@ -357,8 +355,7 @@ tdbBegin(pEnv, &txn);
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(tdb_test, DISABLED_simple_insert1) { // TEST(tdb_test, DISABLED_simple_insert1) {
@ -492,6 +489,5 @@ TEST(tdb_test, simple_insert1) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }

View File

@ -468,8 +468,7 @@ TEST(TdbPageDefragmentTest, DISABLED_simple_insert1) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(TdbPageDefragmentTest, DISABLED_seq_insert) { // TEST(TdbPageDefragmentTest, DISABLED_seq_insert) {
@ -551,8 +550,7 @@ TEST(TdbPageDefragmentTest, seq_insert) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(TdbPageDefragmentTest, DISABLED_seq_delete) { // TEST(TdbPageDefragmentTest, DISABLED_seq_delete) {
@ -635,8 +633,7 @@ TEST(TdbPageDefragmentTest, seq_delete) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(TdbPageDefragmentTest, DISABLED_defragment_insert) { // TEST(TdbPageDefragmentTest, DISABLED_defragment_insert) {
@ -717,6 +714,5 @@ TEST(TdbPageDefragmentTest, defragment_insert) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }

View File

@ -123,7 +123,7 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) { static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
TDB *pEnv = NULL; TDB *pEnv = NULL;
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0 , 0, NULL); int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0, 0, NULL);
if (ret) { if (ret) {
pEnv = NULL; pEnv = NULL;
} }
@ -187,8 +187,7 @@ static void insertOfp(void) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
static void clearDb(char const *db) { taosRemoveDir(db); } static void clearDb(char const *db) { taosRemoveDir(db); }
@ -471,8 +470,7 @@ TEST(TdbPageRecycleTest, DISABLED_simple_insert1) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
static void insertDb(int nData) { static void insertDb(int nData) {
@ -537,8 +535,7 @@ static void insertDb(int nData) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb"); system("ls -l ./tdb");
} }
@ -607,8 +604,7 @@ static void deleteDb(int nData) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb"); system("ls -l ./tdb");
} }
@ -675,8 +671,7 @@ static void deleteOfp(void) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
// TEST(TdbPageRecycleTest, DISABLED_seq_delete_ofp) { // TEST(TdbPageRecycleTest, DISABLED_seq_delete_ofp) {
@ -761,8 +756,7 @@ TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_nocommit) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb"); system("ls -l ./tdb");
} }
@ -828,8 +822,7 @@ TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb"); system("ls -l ./tdb");
} }

View File

@ -231,8 +231,7 @@ TEST(tdb_test, DISABLED_simple_insert1) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, DISABLED_simple_insert2) { TEST(tdb_test, DISABLED_simple_insert2) {
@ -315,8 +314,7 @@ TEST(tdb_test, DISABLED_simple_insert2) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, DISABLED_simple_delete1) { TEST(tdb_test, DISABLED_simple_delete1) {
@ -620,8 +618,7 @@ TEST(tdb_test, multi_thread_query) {
tdbTbClose(pDb); tdbTbClose(pDb);
// Close Env // Close Env
ret = tdbClose(pEnv); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, DISABLED_multi_thread1) { TEST(tdb_test, DISABLED_multi_thread1) {
@ -745,7 +742,6 @@ TEST(tdb_test, DISABLED_multi_thread1) {
tdbTbClose(pTb); tdbTbClose(pTb);
// Close Env // Close Env
ret = tdbClose(pDb); tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
#endif #endif
} }

View File

@ -1766,12 +1766,12 @@ int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, in
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0); FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_BIGINT, 0);
} }
int32_t tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) { void tcompressDebug(uint32_t cmprAlg, uint8_t *l1Alg, uint8_t *l2Alg, uint8_t *level) {
DEFINE_VAR(cmprAlg) DEFINE_VAR(cmprAlg)
*l1Alg = l1; *l1Alg = l1;
*l2Alg = l2; *l2Alg = l2;
*level = lvl; *level = lvl;
return 0; return;
} }
int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDiabled, uint8_t lvlDefault, int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, uint8_t lvlDiabled, uint8_t lvlDefault,