Merge branch '3.0' into enh/TD-30988-3.0

This commit is contained in:
kailixu 2024-07-25 16:20:26 +08:00
commit a7ad94a42c
16 changed files with 955 additions and 1291 deletions

View File

@ -548,8 +548,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
// only occured while sync timeout
terrno = TSDB_CODE_MND_TRANS_SYNC_TIMEOUT;
return -1;
TAOS_RETURN(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT);
}
mndTransUpdateActions(pOld->prepareActions, pNew->prepareActions);
@ -667,8 +666,7 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
return 0;
@ -779,26 +777,29 @@ void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
return -1;
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mInfo("trans:%d, sync to other mnodes, stage:%s createTime:%" PRId64, pTrans->id, mndTransStr(pTrans->stage),
pTrans->createdTime);
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) {
mError("trans:%d, failed to sync, errno:%s code:0x%x createTime:%" PRId64 " saved trans:%d", pTrans->id, terrstr(),
code, pTrans->createdTime, pMnode->syncMgmt.transId);
mError("trans:%d, failed to sync, errno:%s code:0x%x createTime:%" PRId64 " saved trans:%d", pTrans->id,
tstrerror(code), code, pTrans->createdTime, pMnode->syncMgmt.transId);
sdbFreeRaw(pRaw);
return -1;
TAOS_RETURN(code);
}
sdbFreeRaw(pRaw);
mInfo("trans:%d, sync finished, createTime:%" PRId64, pTrans->id, pTrans->createdTime);
return 0;
TAOS_RETURN(code);
}
static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
@ -890,24 +891,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
}
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
code = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
return -1;
TAOS_RETURN(code);
}
}
if (mndCheckTransConflict(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return terrno;
code = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(code);
}
int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
void *pIter = NULL;
bool conflict = false;
SCompactObj *pCompact = NULL;
@ -934,12 +937,12 @@ int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
}
if (conflict) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT_COMPACT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return terrno;
code = TSDB_CODE_MND_TRANS_CONFLICT_COMPACT;
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(code);
}
static bool mndTransActionsOfSameType(SArray *pActions) {
@ -960,66 +963,65 @@ static bool mndTransActionsOfSameType(SArray *pActions) {
}
static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans->exec == TRN_EXEC_PARALLEL) {
if (mndTransActionsOfSameType(pTrans->redoActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of parallel redo actions are not the same", pTrans->id);
return -1;
TAOS_RETURN(code);
}
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (mndTransActionsOfSameType(pTrans->undoActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of parallel undo actions are not the same", pTrans->id);
return -1;
TAOS_RETURN(code);
}
}
}
return 0;
TAOS_RETURN(code);
}
static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
code = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, commit actions of non-changeless trans are empty", pTrans->id);
return -1;
TAOS_RETURN(code);
}
if (mndTransActionsOfSameType(pTrans->commitActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of commit actions are not the same", pTrans->id);
return -1;
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(code);
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans == NULL) return -1;
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
return -1;
}
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
if (mndTransCheckParallelActions(pMnode, pTrans) != 0) {
return -1;
}
TAOS_CHECK_RETURN(mndTransCheckParallelActions(pMnode, pTrans));
if (mndTransCheckCommitActions(pMnode, pTrans) != 0) {
return -1;
}
TAOS_CHECK_RETURN(mndTransCheckCommitActions(pMnode, pTrans));
mInfo("trans:%d, prepare transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, prepare finished", pTrans->id);
STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
if (pNew == NULL) {
mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
return -1;
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, failed to read from sdb since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
pNew->pRpcArray = pTrans->pRpcArray;
@ -1032,37 +1034,41 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
mndTransExecute(pMnode, pNew);
mndReleaseTrans(pMnode, pNew);
// TDOD change to TAOS_RETURN(code);
return 0;
}
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
mInfo("trans:%d, commit transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, commit finished", pTrans->id);
return 0;
TAOS_RETURN(code);
}
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
mInfo("trans:%d, rollback transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to rollback since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, rollback finished", pTrans->id);
return 0;
TAOS_RETURN(code);
}
static int32_t mndTransPreFinish(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
mInfo("trans:%d, pre-finish transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to pre-finish since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to pre-finish since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, pre-finish finished", pTrans->id);
return 0;
TAOS_RETURN(code);
}
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
@ -1168,6 +1174,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
int32_t code = 0;
SMnode *pMnode = pRsp->info.node;
int64_t signature = (int64_t)(pRsp->info.ahandle);
int32_t transId = (int32_t)(signature >> 32);
@ -1175,7 +1182,9 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, failed to get transId from vnode rsp since %s", transId, tstrerror(code));
goto _OVER;
}
@ -1216,7 +1225,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
_OVER:
mndReleaseTrans(pMnode, pTrans);
return 0;
TAOS_RETURN(code);
}
static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
@ -1252,8 +1261,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->rawWritten) return 0;
if (topHalf) {
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
}
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
@ -1272,15 +1280,14 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
mndSetTransLastAction(pTrans, pAction);
}
return code;
TAOS_RETURN(code);
}
// execute at top half
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
}
int64_t signature = pTrans->id;
@ -1324,7 +1331,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
mndSetTransLastAction(pTrans, pAction);
}
return code;
TAOS_RETURN(code);
}
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
@ -1822,8 +1829,7 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
} else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
pArray = pTrans->undoActions;
} else {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
return -1;
TAOS_RETURN(TSDB_CODE_MND_TRANS_INVALID_STAGE);
}
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
@ -1846,17 +1852,19 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
STrans *pTrans = NULL;
if (tDeserializeSKillTransReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
code = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("trans:%d, start to kill", killReq.transId);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS)) != 0) {
goto _OVER;
}
pTrans = mndAcquireTrans(pMnode, killReq.transId);
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
@ -1868,7 +1876,7 @@ _OVER:
}
mndReleaseTrans(pMnode, pTrans);
return code;
TAOS_RETURN(code);
}
static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; }

View File

@ -48,27 +48,23 @@ void metaReaderClear(SMetaReader *pReader) {
}
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
int32_t code = 0;
SMeta *pMeta = pReader->pMeta;
STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _err;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
// decode the entry
tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
goto _err;
}
code = metaDecodeEntry(&pReader->coder, &pReader->me);
if (code) return code;
// taosMemoryFreeClear(pReader->me.colCmpr.pColCmpr);
return 0;
_err:
return -1;
}
bool metaIsTableExist(void *pVnode, tb_uid_t uid) {
@ -90,8 +86,7 @@ int metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
// query uid.idx
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
@ -103,8 +98,7 @@ int metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) {
SMetaInfo info;
if (metaGetInfo(pMeta, uid, &info, pReader) == TSDB_CODE_NOT_FOUND) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
return metaGetTableEntryByVersion(pReader, info.version, uid);
@ -116,8 +110,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
// query name.idx
if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
uid = *(tb_uid_t *)pReader->pBuf;
@ -148,7 +141,7 @@ int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return -1;
return code;
}
STR_TO_VARSTR(tbName, mr.me.name);
@ -164,7 +157,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return -1;
return code;
}
strncpy(tbName, mr.me.name, TSDB_TABLE_NAME_LEN);
metaReaderClear(&mr);
@ -181,9 +174,8 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
// query name.idx
if (tdbTbGet(((SMeta *)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
metaReaderClear(&mr);
return -1;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
*uid = *(tb_uid_t *)pReader->pBuf;

View File

@ -13,9 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
#include "meta.h"
#include "vnodeInt.h"
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
@ -34,15 +33,15 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
int32_t szBuf = 0;
void *p = NULL;
SMetaReader mr = {0};
int32_t code = 0;
// validate req
// save smaIndex
metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
#if 1
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
metaReaderClear(&mr);
return -1; // don't goto _err;
return terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
#else
metaReaderClear(&mr);
return 0;
@ -57,7 +56,8 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
me.name = pCfg->indexName;
me.smaEntry.tsma = pCfg;
if (metaHandleSmaEntry(pMeta, &me) < 0) goto _err;
code = metaHandleSmaEntry(pMeta, &me);
if (code) goto _err;
metaDebug("vgId:%d, tsma is created, name:%s uid:%" PRId64, TD_VID(pMeta->pVnode), pCfg->indexName, pCfg->indexUid);
@ -66,7 +66,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
_err:
metaError("vgId:%d, failed to create tsma:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pCfg->indexName,
pCfg->indexUid, tstrerror(terrno));
return -1;
return code;
}
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
@ -147,24 +147,25 @@ static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
}
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
int32_t code = 0;
metaWLock(pMeta);
// save to table.db
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
if ((code = metaSaveSmaToDB(pMeta, pME)) < 0) goto _err;
// update uid.idx
if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
if ((code = metaUpdateUidIdx(pMeta, pME)) < 0) goto _err;
// update name.idx
if (metaUpdateNameIdx(pMeta, pME) < 0) goto _err;
if ((code = metaUpdateNameIdx(pMeta, pME)) < 0) goto _err;
// update sma.idx
if (metaUpdateSmaIdx(pMeta, pME) < 0) goto _err;
if ((code = metaUpdateSmaIdx(pMeta, pME)) < 0) goto _err;
metaULock(pMeta);
return 0;
_err:
metaULock(pMeta);
return -1;
return code;
}

View File

@ -32,7 +32,7 @@ int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapRe
// alloc
pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
pReader->pMeta = pMeta;
pReader->sver = sver;
@ -261,7 +261,9 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo)
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext** ctxRet) {
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if (ctx == NULL) return -1;
if (ctx == NULL) {
return terrno;
}
*ctxRet = ctx;
ctx->pMeta = pVnode->pMeta;
ctx->snapVersion = snapVersion;
@ -271,12 +273,12 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8
ctx->withMeta = withMeta;
ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (ctx->idVersion == NULL) {
return -1;
return terrno;
}
ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (ctx->suidInfo == NULL) {
return -1;
return terrno;
}
taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
@ -426,21 +428,21 @@ static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* co
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
if (ret < 0) {
return -1;
return ret;
}
*contLen += sizeof(SMsgHead);
*pBuf = taosMemoryMalloc(*contLen);
if (NULL == *pBuf) {
return -1;
return terrno;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
if ((ret = tEncodeSVCreateStbReq(&encoder, req)) < 0) {
taosMemoryFreeClear(*pBuf);
tEncoderClear(&encoder);
return -1;
return ret;
}
tEncoderClear(&encoder);
return 0;

View File

@ -36,11 +36,15 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
int8_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) {
static int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) {
int32_t nCols = pWp->nCols;
int32_t ver = pWp->version;
if (add) {
SColCmpr *p = taosMemoryCalloc(1, sizeof(SColCmpr) * (nCols + 1));
if (p == NULL) {
return terrno;
}
memcpy(p, pWp->pColCmpr, sizeof(SColCmpr) * nCols);
SColCmpr *pCol = p + nCols;
@ -64,7 +68,7 @@ int8_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, ui
pWp->nCols = nCols;
pWp->version = ver;
}
return 1;
return 0;
}
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
pInfo->uid = pEntry->uid;
@ -87,8 +91,7 @@ static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema
pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema));
if (NULL == pMetaRsp->pSchemas) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
return terrno = TSDB_CODE_OUT_OF_MEMORY;
}
pMetaRsp->pSchemaExt = taosMemoryMalloc(pSchema->nCols * sizeof(SSchemaExt));
@ -105,9 +108,11 @@ static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema
}
static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
int32_t code = 0;
#ifdef USE_INVERTED_INDEX
if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
void *data = pCtbEntry->ctbEntry.pTags;
const char *tagName = pSchema->name;
@ -118,8 +123,9 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
int32_t nTagData = 0;
SArray *pTagVals = NULL;
if (tTagToValArray((const STag *)data, &pTagVals) != 0) {
return -1;
code = tTagToValArray((const STag *)data, &pTagVals);
if (code) {
return code;
}
SIndexMultiTerm *terms = indexMultiTermCreate();
@ -168,7 +174,7 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSchema *pSchema) {
#ifdef USE_INVERTED_INDEX
if (pMeta->pTagIvtIdx == NULL || pCtbEntry == NULL) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
void *data = pCtbEntry->ctbEntry.pTags;
const char *tagName = pSchema->name;
@ -179,8 +185,9 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
int32_t nTagData = 0;
SArray *pTagVals = NULL;
if (tTagToValArray((const STag *)data, &pTagVals) != 0) {
return -1;
int32_t code = tTagToValArray((const STag *)data, &pTagVals);
if (code) {
return code;
}
SIndexMultiTerm *terms = indexMultiTermCreate();
@ -247,6 +254,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
void *pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
int32_t code = 0;
// validate req
void *pData = NULL;
@ -256,14 +264,12 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbFree(pData);
SMetaInfo info;
if (metaGetInfo(pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
if (info.uid == info.suid) {
return 0;
} else {
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
}
}
@ -283,7 +289,8 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
me.colCmpr = pReq->colCmpr;
}
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
code = metaHandleEntry(pMeta, &me);
if (code) goto _err;
++pMeta->pVnode->config.vndStats.numOfSTables;
@ -295,7 +302,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
_err:
metaError("vgId:%d, failed to create stb:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, pReq->suid,
tstrerror(terrno));
return -1;
return code;
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
@ -310,8 +317,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
tdbFree(pData);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
}
// drop all child tables
@ -424,16 +430,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
if (ret < 0 || c) {
tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
}
ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
}
oversion = ((SUidIdxVal *)pData)[0].version;
@ -444,9 +448,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
metaError("meta/table: invalide ret: %" PRId32 " or c: %" PRId32 "alter stb failed.", ret, c);
return -1;
return terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
}
ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -454,8 +457,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
}
oStbEntry.pBuf = taosMemoryMalloc(nData);
@ -870,8 +872,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
if (pReq->type == TSDB_CHILD_TABLE) {
tb_uid_t suid = metaGetTableEntryUidByName(pMeta, pReq->ctb.stbName);
if (suid != pReq->ctb.suid) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
}
@ -879,17 +880,15 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) {
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
metaReaderClear(&mr);
return -1;
return terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
}
pReq->uid = mr.me.uid;
if (pReq->type == TSDB_CHILD_TABLE) {
pReq->ctb.suid = mr.me.ctbEntry.suid;
}
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
metaReaderClear(&mr);
return -1;
return terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
} else if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
terrno = TSDB_CODE_SUCCESS;
}
@ -1001,7 +1000,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
_err:
metaError("vgId:%d, failed to create table:%s type:%s since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->type == TSDB_CHILD_TABLE ? "child table" : "normal table", tstrerror(terrno));
return -1;
return TSDB_CODE_FAILED;
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) {
@ -1015,8 +1014,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
uid = *(tb_uid_t *)pData;
@ -1154,7 +1152,7 @@ int32_t metaTrimTables(SMeta *pMeta) {
SArray *tbUids = taosArrayInit(8, sizeof(int64_t));
if (tbUids == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
code = metaFilterTableByHash(pMeta, tbUids);
@ -1195,7 +1193,7 @@ static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) {
} else if (pME->type == TSDB_NORMAL_TABLE) {
btime = pME->ntbEntry.btime;
} else {
return -1;
return TSDB_CODE_FAILED;
}
btimeKey->btime = btime;
@ -1208,7 +1206,7 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
ncolKey->ncol = pME->ntbEntry.schemaRow.nCols;
ncolKey->uid = pME->uid;
} else {
return -1;
return TSDB_CODE_FAILED;
}
return 0;
}
@ -1235,7 +1233,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData);
if (rc < 0) {
return -1;
return rc;
}
int64_t version = ((SUidIdxVal *)pData)[0].version;
@ -1245,7 +1243,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
rc = metaDecodeEntry(&dc, &e);
if (rc < 0) {
tDecoderClear(&dc);
return -1;
return rc;
}
if (type) *type = e.type;
@ -1408,16 +1406,14 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
int c;
bool freeColCmpr = false;
if (pAlterTbReq->colName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
metaError("meta/table: null pAlterTbReq->colName");
return -1;
return terrno = TSDB_CODE_INVALID_MSG;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
uid = *(tb_uid_t *)pVal;
@ -1432,7 +1428,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if (c != 0) {
tdbTbcClose(pUidIdxc);
metaError("meta/table: invalide c: %" PRId32 " alt tb column failed.", c);
return -1;
return TSDB_CODE_FAILED;
}
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
@ -1447,7 +1443,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
metaError("meta/table: invalide c: %" PRId32 " alt tb column failed.", c);
return -1;
return TSDB_CODE_FAILED;
}
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -1463,7 +1459,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
tdbTbcClose(pTbDbc);
tDecoderClear(&dc);
metaError("meta/table: invalide ret: %" PRId32 " alt tb column failed.", ret);
return -1;
return ret;
}
if (entry.type != TSDB_NORMAL_TABLE) {
@ -1660,7 +1656,7 @@ _err:
tdbTbcClose(pUidIdxc);
tDecoderClear(&dc);
return -1;
return TSDB_CODE_FAILED;
}
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
@ -1676,15 +1672,13 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
int nData = 0;
if (pAlterTbReq->tagName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
return terrno = TSDB_CODE_INVALID_MSG;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
uid = *(tb_uid_t *)pVal;
@ -1698,9 +1692,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
if (c != 0) {
tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c);
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
@ -1717,9 +1710,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
if (c != 0) {
tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
metaError("meta/table: invalide c: %" PRId32 " update tb tag val failed.", c);
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -1866,8 +1858,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
uid = *(tb_uid_t *)pVal;
@ -1882,7 +1873,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
if (c != 0) {
tdbTbcClose(pUidIdxc);
metaError("meta/table: invalide c: %" PRId32 " update tb options failed.", c);
return -1;
return TSDB_CODE_FAILED;
}
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
@ -1897,7 +1888,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
metaError("meta/table: invalide c: %" PRId32 " update tb options failed.", c);
return -1;
return TSDB_CODE_FAILED;
}
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -1913,7 +1904,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
tdbTbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
metaError("meta/table: invalide ret: %" PRId32 " alt tb options failed.", ret);
return -1;
return TSDB_CODE_FAILED;
}
entry.version = version;
@ -1968,15 +1959,13 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
SDecoder dc = {0};
if (pAlterTbReq->tagName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
return terrno = TSDB_CODE_INVALID_MSG;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
} else {
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
@ -2085,7 +2074,7 @@ _err:
// if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
// tdbTbcClose(pTbDbc);
// tdbTbcClose(pUidIdxc);
return -1;
return TSDB_CODE_FAILED;
}
typedef struct SMetaPair {
@ -2106,15 +2095,13 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
SDecoder dc = {0};
if (pAlterTbReq->tagName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
return terrno = TSDB_CODE_INVALID_MSG;
}
// search name index
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
suid = *(tb_uid_t *)pVal;
tdbFree(pVal);
@ -2200,7 +2187,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
// set pCol->flags; INDEX_ON
return 0;
_err:
return -1;
return TSDB_CODE_FAILED;
}
int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
// impl later
@ -2216,8 +2203,7 @@ int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *
SDecoder dc = {0};
ret = tdbTbGet(pMeta->pNameIdx, pReq->tbName, strlen(pReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return -1;
return terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
}
suid = *(tb_uid_t *)pVal;
tdbFree(pVal);
@ -2290,7 +2276,7 @@ int32_t metaUpdateTableColCompress(SMeta *pMeta, int64_t version, SVAlterTbReq *
return 0;
_err:
return -1;
return TSDB_CODE_FAILED;
}
int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pMetaRsp) {
@ -2313,8 +2299,7 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
return metaUpdateTableColCompress(pMeta, version, pReq);
default:
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
return -1;
return terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
break;
}
}
@ -2370,7 +2355,7 @@ _err:
pME->uid, tstrerror(terrno));
taosMemoryFree(pVal);
return -1;
return TSDB_CODE_FAILED;
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -2446,8 +2431,7 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
if (*ppTagIdxKey == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
return terrno = TSDB_CODE_OUT_OF_MEMORY;
}
(*ppTagIdxKey)->suid = suid;
@ -2667,7 +2651,7 @@ _err:
metaULock(pMeta);
metaError("vgId:%d, failed to handle meta entry since %s at line:%d, ver:%" PRId64 ", uid:%" PRId64 ", name:%s",
TD_VID(pMeta->pVnode), terrstr(), line, pME->version, pME->uid, pME->name);
return -1;
return TSDB_CODE_FAILED;
}
int32_t colCompressDebug(SHashObj *pColCmprObj) {
@ -2703,7 +2687,7 @@ int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) {
if (rc < 0) {
taosHashClear(pColCmprObj);
metaULock(pMeta);
return -1;
return TSDB_CODE_FAILED;
}
int64_t version = ((SUidIdxVal *)pData)[0].version;
rc = tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData);
@ -2721,7 +2705,7 @@ int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) {
tdbFree(pData);
metaULock(pMeta);
taosHashClear(pColCmprObj);
return -1;
return rc;
}
if (useCompress(e.type)) {
SColCmprWrapper *p = &e.colCmpr;

File diff suppressed because it is too large Load Diff

View File

@ -281,12 +281,13 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t code = 0;
vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) == 0) {
if (info.config.vgId != dstVgId) {
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
return -1;
return TSDB_CODE_FAILED;
}
return dstVgId;
}
@ -302,13 +303,13 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return srcVgId;
} else if (info.config.vgId != dstVgId) {
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
return -1;
return TSDB_CODE_FAILED;
}
vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
return -1;
return TSDB_CODE_FAILED;
}
return dstVgId;
@ -333,8 +334,7 @@ static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
}
if (diskPrimary < 0 || diskPrimary >= ndisk) {
vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
terrno = TSDB_CODE_FS_INVLD_CFG;
return -1;
return terrno = TSDB_CODE_FS_INVLD_CFG;
}
return 0;
}

View File

@ -513,6 +513,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg *pRsp) {
int32_t code = 0;
void *ptr = NULL;
void *pReq;
int32_t len;
@ -520,8 +521,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
if (ver <= pVnode->state.applied) {
vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
pVnode->state.applied);
terrno = TSDB_CODE_VND_DUP_REQUEST;
return -1;
return terrno = TSDB_CODE_VND_DUP_REQUEST;
}
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64
@ -693,7 +693,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
break;
default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return -1;
return TSDB_CODE_INVALID_MSG;
}
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
@ -701,21 +701,24 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
walApplyVer(pVnode->pWal, ver);
if (tqPushMsg(pVnode->pTq, pMsg->msgType) < 0) {
code = tqPushMsg(pVnode->pTq, pMsg->msgType);
if (code) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
return code;
}
// commit if need
if (needCommit) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), ver);
if (vnodeAsyncCommit(pVnode) < 0) {
code = vnodeAsyncCommit(pVnode);
if (code) {
vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// start a new one
if (vnodeBegin(pVnode) < 0) {
code = vnodeBegin(pVnode);
if (code) {
vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
@ -727,7 +730,7 @@ _exit:
_err:
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(terrno), ver);
return -1;
return code;
}
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
@ -987,11 +990,18 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void
if (terrno < 0) goto _end;
strncpy(buf, mr.me.name, TSDB_TABLE_NAME_LEN);
void *p = taosArrayPush(pNames, buf);
if (p == NULL) {
goto _end;
}
expiredTb.name = p;
if (mr.me.type == TSDB_CHILD_TABLE) {
expiredTb.suid = mr.me.ctbEntry.suid;
}
taosArrayPush(rsp.pExpiredTbs, &expiredTb);
if (taosArrayPush(rsp.pExpiredTbs, &expiredTb) == NULL) {
goto _end;
}
}
int32_t ret = 0;
@ -1017,6 +1027,7 @@ _end:
}
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SVCreateStbReq req = {0};
SDecoder coder;
@ -1028,18 +1039,20 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq,
// decode and process req
tDecoderInit(&coder, pReq, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
pRsp->code = terrno;
code = tDecodeSVCreateStbReq(&coder, &req);
if (code) {
pRsp->code = code;
goto _err;
}
if (metaCreateSTable(pVnode->pMeta, ver, &req) < 0) {
pRsp->code = terrno;
code = metaCreateSTable(pVnode->pMeta, ver, &req);
if (code) {
pRsp->code = code;
goto _err;
}
if (tdProcessRSmaCreate(pVnode->pSma, &req) < 0) {
pRsp->code = terrno;
if ((code = tdProcessRSmaCreate(pVnode->pSma, &req)) < 0) {
pRsp->code = code;
goto _err;
}
@ -1048,7 +1061,7 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq,
_err:
tDecoderClear(&coder);
return -1;
return code;
}
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
@ -1100,7 +1113,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
goto _exit;
}
strcpy(str, pCreateReq->name);
taosArrayPush(tbNames, &str);
if (taosArrayPush(tbNames, &str) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
}
// validate hash
@ -1185,6 +1202,7 @@ _exit:
}
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SVCreateStbReq req = {0};
SDecoder dc = {0};
@ -1196,16 +1214,17 @@ static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq,
tDecoderInit(&dc, pReq, len);
// decode req
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
code = tDecodeSVCreateStbReq(&dc, &req);
if (code) {
tDecoderClear(&dc);
return -1;
return code;
}
if (metaAlterSTable(pVnode->pMeta, ver, &req) < 0) {
pRsp->code = terrno;
code = metaAlterSTable(pVnode->pMeta, ver, &req);
if (code) {
pRsp->code = code;
tDecoderClear(&dc);
return -1;
return code;
}
tDecoderClear(&dc);
@ -2224,6 +2243,7 @@ _err:
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder dc = {0};
int32_t code = 0;
pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
@ -2233,35 +2253,38 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pRe
tDecoderInit(&dc, pReq, len);
// decode req
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
return terrno = TSDB_CODE_INVALID_MSG;
}
if (metaAddIndexToSTable(pVnode->pMeta, ver, &req) < 0) {
pRsp->code = terrno;
code = metaAddIndexToSTable(pVnode->pMeta, ver, &req);
if (code) {
pRsp->code = code;
goto _err;
}
tDecoderClear(&dc);
return 0;
_err:
tDecoderClear(&dc);
return -1;
return code;
}
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
SDropIndexReq req = {0};
int32_t code = 0;
pRsp->msgType = TDMT_VND_DROP_INDEX_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
if (tDeserializeSDropIdxReq(pReq, len, &req)) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
if ((code = tDeserializeSDropIdxReq(pReq, len, &req))) {
return code;
}
if (metaDropIndexFromSTable(pVnode->pMeta, ver, &req) < 0) {
pRsp->code = terrno;
return -1;
code = metaDropIndexFromSTable(pVnode->pMeta, ver, &req);
if (code) {
pRsp->code = code;
return code;
}
return TSDB_CODE_SUCCESS;
}
@ -2290,20 +2313,17 @@ static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pR
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
SSyncState syncState = syncGetState(pVnode->sync);
if (syncState.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return -1;
return terrno = TSDB_CODE_SYN_NOT_LEADER;
}
char token[TSDB_ARB_TOKEN_SIZE] = {0};
if (vnodeGetArbToken(pVnode, token) != 0) {
terrno = TSDB_CODE_NOT_FOUND;
return -1;
return terrno = TSDB_CODE_NOT_FOUND;
}
if (strncmp(token, member0Token, TSDB_ARB_TOKEN_SIZE) != 0 &&
strncmp(token, member1Token, TSDB_ARB_TOKEN_SIZE) != 0) {
terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
return -1;
return terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
}
terrno = TSDB_CODE_SUCCESS;
@ -2324,9 +2344,9 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
SVArbCheckSyncReq syncReq = {0};
if (tDeserializeSVArbCheckSyncReq(pReq, len, &syncReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
code = tDeserializeSVArbCheckSyncReq(pReq, len, &syncReq);
if (code) {
return terrno = code;
}
pRsp->msgType = TDMT_VND_ARB_CHECK_SYNC_RSP;

View File

@ -51,7 +51,12 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
ASSERT(pLogStore->data != NULL);
if (!pLogStore->data) {
taosMemoryFree(pLogStore);
taosLRUCacheCleanup(pLogStore->pCache);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode;
@ -60,7 +65,13 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
ASSERT(pData->pWalHandle != NULL);
if (!pData->pWalHandle) {
taosMemoryFree(pLogStore);
taosLRUCacheCleanup(pLogStore->pCache);
taosThreadMutexDestroy(&(pData->mutex));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
pLogStore->syncLogCommitIndex = raftlogCommitIndex;
@ -110,7 +121,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
SWal* pWal = pData->pWal;
int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex);
if (code != 0) {
int32_t err = terrno;
int32_t err = code;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
@ -118,10 +129,10 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
sNError(pData->pSyncNode,
"wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
err, errStr, sysErr, sysErrStr);
return -1;
TAOS_RETURN(err);
}
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
@ -224,7 +235,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, errStr, sysErr, sysErrStr);
return -1;
TAOS_RETURN(err);
}
code = walFsync(pWal, forceSync);
@ -235,7 +247,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
// entry found, return 0
@ -253,10 +265,10 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);
taosThreadMutexUnlock(&(pData->mutex));
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
}
int64_t ts2 = taosGetTimestampNs();
@ -266,7 +278,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
// code = walReadVerCached(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
int32_t err = code;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
@ -286,7 +298,8 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
*/
taosThreadMutexUnlock(&(pData->mutex));
return code;
TAOS_RETURN(code);
}
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
@ -319,7 +332,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
", elapsed-build:%" PRId64,
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
return code;
TAOS_RETURN(code);
}
// truncate semantic
@ -329,7 +342,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
int32_t code = walRollback(pWal, fromIndex);
if (code != 0) {
int32_t err = terrno;
int32_t err = code;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
@ -339,7 +352,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
// event log
sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
return code;
TAOS_RETURN(code);
}
// entry found, return 0
@ -352,16 +366,16 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
*ppLastEntry = NULL;
if (walIsEmpty(pWal)) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
} else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
TAOS_RETURN(code);
}
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
@ -375,20 +389,22 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
if (index < snapshotVer || index > wallastVer) {
// ignore
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t code = walCommit(pWal, index);
if (code != 0) {
int32_t err = terrno;
int32_t err = code;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
return -1;
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
@ -405,5 +421,6 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walGetCommittedVer(pWal);
}

View File

@ -25,17 +25,17 @@ static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
int32_t code = 0;
tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
if (code < 0) return -1;
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
if (code < 0) return -1;
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
if (code < 0) return -1;
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t raftStoreReadFile(SSyncNode *pNode) {
int32_t code = -1;
int32_t code = -1, lino = 0;
TdFilePtr pFile = NULL;
char *pData = NULL;
SJson *pJson = NULL;
@ -52,41 +52,38 @@ int32_t raftStoreReadFile(SSyncNode *pNode) {
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
}
int64_t size = 0;
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
}
pData = taosMemoryMalloc(size + 1);
if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
}
if (taosReadFile(pFile, pData, size) != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
goto _OVER;
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
}
pData[size] = '\0';
pJson = tjsonParse(pData);
if (pJson == NULL) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER);
}
if (raftStoreDecode(pJson, pStore) < 0) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER);
}
code = 0;
@ -100,18 +97,20 @@ _OVER:
if (code != 0) {
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
}
return code;
TAOS_RETURN(code);
}
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1;
return 0;
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t raftStoreWriteFile(SSyncNode *pNode) {
int32_t code = -1;
int32_t code = -1, lino = 0;
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
@ -120,23 +119,23 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s.bak", realfile);
terrno = TSDB_CODE_OUT_OF_MEMORY;
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (raftStoreEncode(pJson, pStore) != 0) goto _OVER;
if (pJson == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
if (raftStoreEncode(pJson, pStore) != 0) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
if (buffer == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pFile == NULL) goto _OVER;
if (pFile == NULL) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
code = 0;
sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm);
@ -147,7 +146,6 @@ _OVER:
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
}
return code;

View File

@ -52,7 +52,8 @@ int32_t syncNodeReplicateReset(SSyncNode* pNode, SRaftId* pDestId) {
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
syncLogReplReset(pMgr);
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t syncNodeReplicate(SSyncNode* pNode) {
@ -60,13 +61,14 @@ int32_t syncNodeReplicate(SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
int32_t ret = syncNodeReplicateWithoutLock(pNode);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
TAOS_RETURN(ret);
}
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) ||
pNode->raftCfg.cfg.totalReplicaNum == 1) {
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
for (int32_t i = 0; i < pNode->totalReplicaNum; i++) {
if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
@ -75,14 +77,16 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogReplStart(pMgr, pNode);
}
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont;
pMsg->destId = *destRaftId;
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
@ -112,5 +116,5 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
}
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}

View File

@ -20,7 +20,6 @@
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "syncUtil.h"
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
@ -95,7 +94,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
syncLogRecvRequestVote(ths, pMsg, -1, "not in my config");
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
@ -122,8 +122,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// send msg
SRpcMsg rpcMsg = {0};
ret = syncBuildRequestVoteReply(&rpcMsg, ths->vgId);
ASSERT(ret == 0);
TAOS_CHECK_RETURN(syncBuildRequestVoteReply(&rpcMsg, ths->vgId));
SyncRequestVoteReply* pReply = rpcMsg.pCont;
pReply->srcId = ths->myRaftId;
@ -138,5 +138,6 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
if (resetElect) syncNodeResetElectTimer(ths);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}

View File

@ -45,19 +45,22 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvRequestVoteReply(ths, pMsg, "not in my config");
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
SyncTerm currentTerm = raftStoreGetTerm(ths);
// drop stale response
if (pMsg->term < currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
if (pMsg->term > currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
syncLogRecvRequestVoteReply(ths, pMsg, "");
@ -69,7 +72,8 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (ths->pVotesRespond->term != pMsg->term) {
sNError(ths, "vote respond error vote-respond-mgr term:%" PRIu64 ", msg term:%" PRIu64 "",
ths->pVotesRespond->term, pMsg->term);
return -1;
TAOS_RETURN(TSDB_CODE_FAILED);
}
votesRespondAdd(ths->pVotesRespond, pMsg);
@ -93,5 +97,5 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
}
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}

View File

@ -114,7 +114,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
ret = tdbBegin(pEnv, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (ret < 0) {
tdbOsFree(pBt);
return -1;
return ret;
}
SBtreeInitPageArg zArg;
@ -124,7 +124,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
if (ret < 0) {
tdbAbort(pEnv, txn);
tdbOsFree(pBt);
return -1;
return ret;
}
ret = tdbPagerWrite(pPager, pPage);
@ -132,7 +132,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
tdbError("failed to write page since %s", terrstr());
tdbAbort(pEnv, txn);
tdbOsFree(pBt);
return -1;
return ret;
}
if (strcmp(TDB_MAINDB_NAME, tbname)) {
@ -145,20 +145,23 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
if (ret < 0) {
tdbAbort(pEnv, txn);
tdbOsFree(pBt);
return -1;
return ret;
}
}
tdbPCacheRelease(pPager->pCache, pPage, txn);
tdbCommit(pPager->pEnv, txn);
tdbPostCommit(pPager->pEnv, txn);
ret = tdbCommit(pPager->pEnv, txn);
if (ret) return ret;
ret = tdbPostCommit(pPager->pEnv, txn);
if (ret) return ret;
}
if (pgno == 0) {
tdbError("tdb/btree-open: pgno cannot be zero.");
tdbOsFree(pBt);
return -1;
ASSERT(0);
}
pBt->root = pgno;
/*
@ -200,7 +203,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
if (ret < 0) {
tdbBtcClose(&btc);
tdbError("tdb/btree-insert: btc move to failed with ret: %d.", ret);
return -1;
return ret;
}
if (btc.idx == -1) {
@ -212,7 +215,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
// dup key not allowed with insert
tdbBtcClose(&btc);
tdbError("tdb/btree-insert: dup key. pKey: %p, kLen: %d, btc: %p, pTxn: %p", pKey, kLen, &btc, pTxn);
return -1;
return TSDB_CODE_DUP_KEY;
}
}
@ -220,7 +223,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
if (ret < 0) {
tdbBtcClose(&btc);
tdbError("tdb/btree-insert: btc upsert failed with ret: %d.", ret);
return -1;
return ret;
}
tdbBtcClose(&btc);
@ -245,18 +248,19 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
if (ret < 0) {
tdbBtcClose(&btc);
tdbError("tdb/btree-delete: btc move to failed with ret: %d.", ret);
return -1;
return ret;
}
if (btc.idx < 0 || c != 0) {
tdbBtcClose(&btc);
return -1;
return TSDB_CODE_NOT_FOUND;
}
// delete the key
if (tdbBtcDelete(&btc) < 0) {
ret = tdbBtcDelete(&btc);
if (ret < 0) {
tdbBtcClose(&btc);
return -1;
return ret;
}
/*
SArray *ofps = btc.coder.ofps;
@ -341,13 +345,12 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
if (ret < 0) {
tdbBtcClose(&btc);
tdbError("tdb/btree-pget: btc move to failed with ret: %d.", ret);
return -1;
return ret;
}
if (btc.idx < 0 || cret) {
tdbBtcClose(&btc);
return -1;
return TSDB_CODE_NOT_FOUND;
}
pCell = tdbPageGetCell(btc.pPage, btc.idx);
@ -355,7 +358,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
if (ret < 0) {
tdbBtcClose(&btc);
tdbError("tdb/btree-pget: decode cell failed with ret: %d.", ret);
return -1;
return ret;
}
if (ppKey) {
@ -363,7 +366,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
if (pTKey == NULL) {
tdbBtcClose(&btc);
tdbError("tdb/btree-pget: realloc pTKey failed.");
return -1;
return terrno;
}
*ppKey = pTKey;
*pkLen = cd.kLen;
@ -375,7 +378,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
if (pTVal == NULL) {
tdbBtcClose(&btc);
tdbError("tdb/btree-pget: realloc pTVal failed.");
return -1;
return terrno;
}
*ppVal = pTVal;
*vLen = cd.vLen;
@ -495,7 +498,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
zArg.pBt = pBt;
ret = tdbPagerFetchPage(pPager, &pgnoChild, &pChild, tdbBtreeInitPage, &zArg, pTxn);
if (ret < 0) {
return -1;
return ret;
}
if (!leaf) {
@ -505,7 +508,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
ret = tdbPagerWrite(pPager, pChild);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
// Copy the root page content to the child page
@ -516,7 +519,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
zArg.pBt = pBt;
ret = tdbBtreeInitPage(pRoot, &zArg, 0);
if (ret < 0) {
return -1;
return ret;
}
pIntHdr = (SIntHdr *)(pRoot->pData);
@ -557,13 +560,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
for (int i = 0; i < nOlds; i++) {
if (ASSERT(sIdx + i <= nCells)) {
return -1;
return TSDB_CODE_FAILED;
}
SPgno pgno;
if (sIdx + i == nCells) {
if (ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pParent))) {
return -1;
return TSDB_CODE_FAILED;
}
pgno = ((SIntHdr *)(pParent->pData))->pgno;
} else {
@ -575,13 +578,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
&((SBtreeInitPageArg){.pBt = pBt, .flags = 0}), pTxn);
if (ret < 0) {
tdbError("tdb/btree-balance: fetch page failed with ret: %d.", ret);
return -1;
return TSDB_CODE_FAILED;
}
ret = tdbPagerWrite(pBt->pPager, pOlds[i]);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return TSDB_CODE_FAILED;
}
}
// copy the parent key out if child pages are not leaf page
@ -608,7 +611,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
ret = tdbPagerWrite(pBt->pPager, pParent);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
// drop the cells on parent page
@ -718,7 +721,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
if (ASSERT(infoNews[iNew - 1].cnt > 0)) {
return -1;
return TSDB_CODE_FAILED;
}
if (infoNews[iNew].size + szRCell >= infoNews[iNew - 1].size - szRCell) {
@ -763,13 +766,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
ret = tdbPagerFetchPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeInitPage, &iarg, pTxn);
if (ret < 0) {
tdbError("tdb/btree-balance: fetch page failed with ret: %d.", ret);
return -1;
return ret;
}
ret = tdbPagerWrite(pBt->pPager, pNews[iNew]);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
}
}
@ -808,10 +811,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
szCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
if (ASSERT(nNewCells <= infoNews[iNew].cnt)) {
return -1;
return TSDB_CODE_FAILED;
}
if (ASSERT(iNew < nNews)) {
return -1;
return TSDB_CODE_FAILED;
}
if (nNewCells < infoNews[iNew].cnt) {
@ -852,10 +855,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
} else {
if (ASSERT(childNotLeaf)) {
return -1;
return TSDB_CODE_FAILED;
}
if (ASSERT(iNew < nNews - 1)) {
return -1;
return TSDB_CODE_FAILED;
}
// set current new page right-most child
@ -863,7 +866,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
// insert to parent as divider cell
if (ASSERT(iNew < nNews - 1)) {
return -1;
return TSDB_CODE_FAILED;
}
((SPgno *)pCell)[0] = TDB_PAGE_PGNO(pNews[iNew]);
tdbPageInsertCell(pParent, sIdx++, pCell, szCell, 0);
@ -880,7 +883,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
if (childNotLeaf) {
if (ASSERT(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt)) {
return -1;
return TSDB_CODE_FAILED;
}
((SIntHdr *)(pNews[nNews - 1]->pData))->pgno = rPgno;
@ -961,7 +964,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1]), pBtc->pTxn);
if (ret < 0) {
return -1;
return ret;
}
pBtc->idx = 0;
@ -975,7 +978,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1], pBtc->pTxn);
if (ret < 0) {
return -1;
return ret;
}
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
@ -998,14 +1001,14 @@ static int tdbFetchOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt)
iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
ret = tdbPagerFetchPage(pBt->pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
if (ret < 0) {
return -1;
return ret;
}
// mark dirty
ret = tdbPagerWrite(pBt->pPager, *ppOfp);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
tdbPCacheRelease(pBt->pPager->pCache, *ppOfp, pTxn);
@ -1021,7 +1024,7 @@ static int tdbLoadOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt)
iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
ret = tdbPagerFetchPage(pBt->pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
if (ret < 0) {
return -1;
return ret;
}
return ret;
@ -1058,13 +1061,13 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbFetchOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
return ret;
}
// local buffer for cell
SCell *pBuf = tdbRealloc(NULL, pBt->pageSize);
if (pBuf == NULL) {
return -1;
return ret;
}
int nLeft = nPayload;
@ -1078,7 +1081,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
if (nLocal > nHeader + kLen + sizeof(SPgno)) {
if (ASSERT(pVal != NULL && vLen != 0)) {
tdbFree(pBuf);
return -1;
return TSDB_CODE_FAILED;
}
memcpy(pCell + nHeader + kLen, pVal, nLocal - nHeader - kLen - sizeof(SPgno));
nLeft -= nLocal - nHeader - kLen - sizeof(SPgno);
@ -1103,7 +1106,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
return ret;
}
} else {
pgno = 0;
@ -1115,7 +1118,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
return ret;
}
ofp = nextOfp;
@ -1163,7 +1166,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
return ret;
}
}
} else {
@ -1171,7 +1174,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
return ret;
}
}
@ -1179,7 +1182,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
return -1;
return ret;
}
ofp = nextOfp;
@ -1203,7 +1206,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
return ret;
}
} else {
pgno = 0;
@ -1214,13 +1217,13 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
if (ofp == NULL) {
tdbFree(pBuf);
return -1;
return ret;
}
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
return ret;
}
ofp = nextOfp;
@ -1245,13 +1248,13 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
int ret;
if (ASSERT(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen)) {
return -1;
return TSDB_CODE_FAILED;
}
if (ASSERT(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen)) {
return -1;
return TSDB_CODE_FAILED;
}
if (ASSERT(pKey != NULL && kLen > 0)) {
return -1;
return TSDB_CODE_FAILED;
}
nPayload = 0;
@ -1263,7 +1266,7 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
if (!leaf) {
if (pPage->vLen != sizeof(SPgno)) {
tdbError("tdb/btree-encode-cell: invalid cell.");
return -1;
return TSDB_CODE_INVALID_PARA;
}
((SPgno *)(pCell + nHeader))[0] = ((SPgno *)pVal)[0];
@ -1290,7 +1293,7 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
if (ret < 0) {
// TODO
tdbError("tdb/btree-encode-cell: encode payload failed with ret: %d.", ret);
return -1;
return ret;
}
*szCell = nHeader + nPayload;
@ -1309,7 +1312,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
if (pDecoder->pVal) {
if (TDB_BTREE_PAGE_IS_LEAF(pPage)) {
tdbError("tdb/btree-decode-payload: leaf page with non-null pVal.");
return -1;
return TSDB_CODE_INVALID_DATA_FMT;
}
nPayload = pDecoder->kLen;
} else {
@ -1344,7 +1347,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
return terrno;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
@ -1361,7 +1364,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
return ret;
}
/*
if (pDecoder->ofps) {
@ -1389,7 +1392,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
// load partial key and nextPgno
pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen);
if (pDecoder->pKey == NULL) {
return -1;
return terrno;
}
TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
@ -1406,7 +1409,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
// printf("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p\n", pTxn, pgno, pCell);
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
return ret;
}
/*
if (pDecoder->ofps) {
@ -1439,7 +1442,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
return terrno;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
@ -1459,7 +1462,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
while (nLeft > 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
return ret;
}
ofpCell = tdbPageGetCell(ofp, 0);
@ -1480,7 +1483,7 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
if (!pDecoder->pVal) {
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
return terrno;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
}
@ -1529,7 +1532,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
if (!leaf) {
if (pPage->vLen != sizeof(SPgno)) {
tdbError("tdb/btree-decode-cell: invalid cell.");
return -1;
return TSDB_CODE_INVALID_DATA_FMT;
}
pDecoder->pgno = ((SPgno *)(pCell + nHeader))[0];
@ -1546,7 +1549,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
if (pPage->vLen == TDB_VARIANT_LEN) {
if (!leaf) {
tdbError("tdb/btree-decode-cell: not a leaf page.");
return -1;
return TSDB_CODE_INVALID_DATA_FMT;
}
nHeader += tdbGetVarInt(pCell + nHeader, &(pDecoder->vLen));
} else {
@ -1556,7 +1559,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
// 2. Decode payload part
ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder, pTxn, pBt);
if (ret < 0) {
return -1;
return ret;
}
return 0;
@ -1610,7 +1613,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
return ret;
}
SCell *ofpCell = tdbPageGetCell(ofp, 0);
@ -1627,7 +1630,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
ret = tdbPagerWrite(pBt->pPager, ofp);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
/*
tdbPageDropCell(ofp, 0, pTxn, pBt);
@ -1664,12 +1667,13 @@ int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
if (pTxn == NULL) {
TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn));
if (!pTxn) {
return -1;
return terrno;
}
if (tdbTxnOpen(pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
int32_t ret = tdbTxnOpen(pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
if (ret < 0) {
tdbOsFree(pTxn);
return -1;
return ret;
}
pBtc->pTxn = pTxn;
@ -1698,12 +1702,12 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
&((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
if (ret < 0) {
tdbError("tdb/btc-move-tofirst: fetch page failed with ret: %d.", ret);
return -1;
return ret;
}
if (!TDB_BTREE_PAGE_IS_ROOT(pBtc->pPage)) {
tdbError("tdb/btc-move-tofirst: not a root page");
return -1;
return ret;
}
pBtc->iPage = 0;
@ -1713,7 +1717,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
// no any data, point to an invalid position
if (!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
tdbError("tdb/btc-move-to-first: not a leaf page.");
return -1;
return TSDB_CODE_FAILED;
}
pBtc->idx = -1;
@ -1722,7 +1726,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
} else {
// TODO
tdbError("tdb/btc-move-to-first: move from a dirty cursor.");
return -1;
return TSDB_CODE_FAILED;
#if 0
// move from a position
int iPage = 0;
@ -1755,7 +1759,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
tdbError("tdb/btc-move-tofirst: btc move downward failed with ret: %d.", ret);
return -1;
return ret;
}
pBtc->idx = 0;
@ -1780,7 +1784,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
&((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
if (ret < 0) {
tdbError("tdb/btc-move-tolast: fetch page failed with ret: %d.", ret);
return -1;
return ret;
}
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
@ -1791,7 +1795,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
// no data at all, point to an invalid position
if (!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
tdbError("tdb/btc-move-to-last: not a leaf page.");
return -1;
return TSDB_CODE_FAILED;
}
pBtc->idx = -1;
@ -1800,7 +1804,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
} else {
// TODO
tdbError("tdb/btc-move-to-last: move from a dirty cursor.");
return -1;
return TSDB_CODE_FAILED;
#if 0
int iPage = 0;
@ -1838,7 +1842,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
tdbError("tdb/btc-move-tolast: btc move downward failed with ret: %d.", ret);
return -1;
return ret;
}
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
@ -1860,7 +1864,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
// current cursor points to an invalid position
if (pBtc->idx < 0) {
return -1;
return TSDB_CODE_FAILED;
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
@ -1868,12 +1872,12 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
ret = tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btree-next: decode cell failed with ret: %d.", ret);
return -1;
return ret;
}
pKey = tdbRealloc(*ppKey, cd.kLen);
if (pKey == NULL) {
return -1;
return terrno;
}
*ppKey = pKey;
@ -1885,7 +1889,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
pVal = tdbRealloc(*ppVal, cd.vLen);
if (pVal == NULL) {
tdbFree(pKey);
return -1;
return terrno;
}
memcpy(pVal, cd.pVal, cd.vLen);
@ -1909,7 +1913,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
ret = tdbBtcMoveToNext(pBtc);
if (ret < 0) {
tdbError("tdb/btree-next: btc move to next failed with ret: %d.", ret);
return -1;
return ret;
}
return 0;
@ -1923,7 +1927,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
// current cursor points to an invalid position
if (pBtc->idx < 0) {
return -1;
return TSDB_CODE_FAILED;
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
@ -1931,12 +1935,12 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
ret = tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btree-prev: decode cell failed with ret: %d.", ret);
return -1;
return ret;
}
pKey = tdbRealloc(*ppKey, cd.kLen);
if (pKey == NULL) {
return -1;
return terrno;
}
*ppKey = pKey;
@ -1948,7 +1952,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
pVal = tdbRealloc(*ppVal, cd.vLen);
if (pVal == NULL) {
tdbFree(pKey);
return -1;
return terrno;
}
*ppVal = pVal;
@ -1959,7 +1963,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
ret = tdbBtcMoveToPrev(pBtc);
if (ret < 0) {
tdbError("tdb/btree-prev: btc move to prev failed with ret: %d.", ret);
return -1;
return ret;
}
return 0;
@ -1972,10 +1976,10 @@ int tdbBtcMoveToNext(SBTC *pBtc) {
if (!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
tdbError("tdb/btc-move-to-next: not a leaf page.");
return -1;
return TSDB_CODE_FAILED;
}
if (pBtc->idx < 0) return -1;
if (pBtc->idx < 0) return TSDB_CODE_FAILED;
pBtc->idx++;
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
@ -1994,7 +1998,7 @@ int tdbBtcMoveToNext(SBTC *pBtc) {
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
tdbError("tdb/btree-decode-cell: should not be a leaf page here.");
return -1;
return TSDB_CODE_FAILED;
}
if (pBtc->idx <= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
break;
@ -2008,7 +2012,7 @@ int tdbBtcMoveToNext(SBTC *pBtc) {
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
tdbError("tdb/btc-move-tonext: btc move downward failed with ret: %d.", ret);
return -1;
return ret;
}
pBtc->idx = 0;
@ -2018,7 +2022,7 @@ int tdbBtcMoveToNext(SBTC *pBtc) {
}
int tdbBtcMoveToPrev(SBTC *pBtc) {
if (pBtc->idx < 0) return -1;
if (pBtc->idx < 0) return TSDB_CODE_FAILED;
pBtc->idx--;
if (pBtc->idx >= 0) {
@ -2061,17 +2065,17 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
if (pBtc->idx < 0) {
tdbError("tdb/btc-move-downward: invalid idx: %d.", pBtc->idx);
return -1;
return TSDB_CODE_FAILED;
}
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
tdbError("tdb/btc-move-downward: should not be a leaf page here.");
return -1;
return TSDB_CODE_FAILED;
}
if (TDB_BTREE_PAGE_IS_OVFL(pBtc->pPage)) {
tdbError("tdb/btc-move-downward: should not be a ovfl page here.");
return -1;
return TSDB_CODE_FAILED;
}
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
@ -2083,7 +2087,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
if (!pgno) {
tdbError("tdb/btc-move-downward: invalid pgno.");
return -1;
return TSDB_CODE_FAILED;
}
pBtc->pgStack[pBtc->iPage] = pBtc->pPage;
@ -2096,14 +2100,14 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
&((SBtreeInitPageArg){.pBt = pBtc->pBt, .flags = 0}), pBtc->pTxn);
if (ret < 0) {
tdbError("tdb/btc-move-downward: fetch page failed with ret: %d.", ret);
return -1;
return TSDB_CODE_FAILED;
}
return 0;
}
static int tdbBtcMoveUpward(SBTC *pBtc) {
if (pBtc->iPage == 0) return -1;
if (pBtc->iPage == 0) return TSDB_CODE_FAILED;
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);
@ -2118,7 +2122,7 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int
SCell *pCell;
if (pBtc->idx < 0 || pBtc->idx >= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
return -1;
return TSDB_CODE_FAILED;
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
@ -2152,14 +2156,14 @@ int tdbBtcDelete(SBTC *pBtc) {
if (idx < 0 || idx >= nCells) {
tdbError("tdb/btc-delete: idx: %d out of range[%d, %d).", idx, 0, nCells);
return -1;
return TSDB_CODE_FAILED;
}
// drop the cell on the leaf
ret = tdbPagerWrite(pPager, pBtc->pPage);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
bool destroyOfps = false;
@ -2200,7 +2204,7 @@ int tdbBtcDelete(SBTC *pBtc) {
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
// update the cell with new key
@ -2211,7 +2215,7 @@ int tdbBtcDelete(SBTC *pBtc) {
if (ret < 0) {
tdbOsFree(pCell);
tdbError("tdb/btc-delete: page update cell failed with ret: %d.", ret);
return -1;
return ret;
}
tdbOsFree(pCell);
@ -2229,7 +2233,7 @@ int tdbBtcDelete(SBTC *pBtc) {
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret);
return -1;
return ret;
}
}
@ -2242,7 +2246,7 @@ int tdbBtcDelete(SBTC *pBtc) {
// delete the leaf page and do balance
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) != 0) {
tdbError("tdb/btc-delete: page to be deleted should be empty.");
return -1;
return TSDB_CODE_FAILED;
}
// printf("tdb/btc-delete: btree balance delete pgno: %d.\n", TDB_PAGE_PGNO(pBtc->pPage));
@ -2250,7 +2254,7 @@ int tdbBtcDelete(SBTC *pBtc) {
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret);
return -1;
return ret;
}
}
}
@ -2268,7 +2272,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
if (pBtc->idx < 0) {
tdbError("tdb/btc-upsert: invalid idx: %d.", pBtc->idx);
return -1;
return TSDB_CODE_FAILED;
}
// alloc space
@ -2276,7 +2280,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) {
tdbError("tdb/btc-upsert: realloc pBuf failed.");
return -1;
return terrno;
}
pBtc->pBt->pBuf = pBuf;
pCell = (SCell *)pBtc->pBt->pBuf;
@ -2285,35 +2289,35 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btc-upsert: btree encode cell failed with ret: %d.", ret);
return -1;
return ret;
}
// mark dirty
ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
return ret;
}
// insert or update
if (insert) {
if (pBtc->idx > nCells) {
tdbError("tdb/btc-upsert: invalid idx: %d, nCells: %d.", pBtc->idx, nCells);
return -1;
return TSDB_CODE_FAILED;
}
ret = tdbPageInsertCell(pBtc->pPage, pBtc->idx, pCell, szCell, 0);
} else {
if (pBtc->idx >= nCells) {
tdbError("tdb/btc-upsert: invalid idx: %d, nCells: %d.", pBtc->idx, nCells);
return -1;
return TSDB_CODE_FAILED;
}
ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
}
if (ret < 0) {
tdbError("tdb/btc-upsert: page insert/update cell failed with ret: %d.", ret);
return -1;
return ret;
}
/*
bool destroyOfps = false;
@ -2327,7 +2331,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
tdbError("tdb/btc-upsert: btree balance failed with ret: %d.", ret);
return -1;
return ret;
}
}
/*
@ -2365,7 +2369,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if (ret < 0) {
// TODO
tdbError("tdb/btc-move-to: fetch page failed with ret: %d.", ret);
return -1;
return ret;
}
pBtc->iPage = 0;
@ -2375,7 +2379,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} else {
// TODO
tdbError("tdb/btc-move-to: move from a dirty cursor.");
return -1;
return TSDB_CODE_FAILED;
#if 0
SPage *pPage;
int idx;
@ -2499,7 +2503,7 @@ int tdbBtcClose(SBTC *pBtc) {
for (;;) {
if (NULL == pBtc->pPage) {
tdbError("tdb/btc-close: null ptr pPage.");
return -1;
return TSDB_CODE_FAILED;
}
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage, pBtc->pTxn);

View File

@ -862,7 +862,7 @@ static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
// Try to allocate from the free list of the pager
ret = tdbPagerAllocFreePage(pPager, ppgno, pTxn);
if (ret < 0) {
return -1;
return ret;
}
if (*ppgno != 0) return 0;
@ -875,7 +875,7 @@ static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
if (*ppgno == 0) {
tdbError("tdb/pager:%p, alloc new page failed.", pPager);
return -1;
return TSDB_CODE_FAILED;
}
return 0;
}
@ -907,7 +907,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
if (nRead < pPage->pageSize) {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
TDB_UNLOCK_PAGE(pPage);
return -1;
return TAOS_SYSTEM_ERROR(errno);
}
int32_t encryptAlgorithm = pPager->pEnv->encryptAlgorithm;
@ -954,7 +954,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " init page failed.", pPager, pgno, nRead,
pPage->pageSize);
TDB_UNLOCK_PAGE(pPage);
return -1;
return ret;
}
tmemory_barrier();
@ -975,7 +975,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
} else {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " lock page failed.", pPager, pgno, nRead,
pPage->pageSize);
return -1;
return TSDB_CODE_FAILED;
}
return 0;
@ -1127,14 +1127,12 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
if (tdbOsClose(jfd) < 0) {
tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
return terrno = TAOS_SYSTEM_ERROR(errno);
}
if (tdbOsRemove(jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
return terrno = TAOS_SYSTEM_ERROR(errno);
}
return 0;

View File

@ -20,6 +20,7 @@
#include "tarray.h"
#include "tdef.h"
#include "tlog.h"
#include "tutil.h"
typedef struct SLRUEntry SLRUEntry;
typedef struct SLRUEntryTable SLRUEntryTable;
@ -114,13 +115,13 @@ static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
table->lengthBits = 4;
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
if (!table->list) {
return -1;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
table->elems = 0;
table->maxLengthBits = maxUpperHashBits;
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) {
@ -349,9 +350,7 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity)
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
int maxUpperHashBits) {
if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) {
return -1;
}
TAOS_CHECK_RETURN(taosLRUEntryTableInit(&shard->table, maxUpperHashBits));
taosThreadMutexInit(&shard->mutex, NULL);
@ -372,7 +371,7 @@ static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool st
taosLRUCacheShardSetCapacity(shard, capacity);
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
@ -671,16 +670,13 @@ static int getDefaultCacheShardBits(size_t capacity) {
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
if (numShardBits >= 20) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
if (!cache) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -692,14 +688,15 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
if (!cache->shards) {
taosMemoryFree(cache);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
bool strictCapacity = 1;
size_t perShard = (capacity + (numShards - 1)) / numShards;
for (int i = 0; i < numShards; ++i) {
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits);
if (TSDB_CODE_SUCCESS !=
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits))
return NULL;
}
cache->numShards = numShards;