fix: add error code if malloc failed

This commit is contained in:
Shengliang Guan 2022-05-23 11:08:31 +08:00
parent 6c1d1927c9
commit c38a8055ba
3 changed files with 61 additions and 171 deletions

View File

@ -28,7 +28,7 @@ extern "C" {
#define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)
extern int tsRpcHeadSize;
extern int32_t tsRpcHeadSize;
typedef struct {
uint32_t clientIp;
@ -69,10 +69,10 @@ typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char * label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only
char *user; // user name
@ -108,9 +108,9 @@ int32_t rpcInit();
void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void * rpcMallocCont(int contLen);
void * rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int contLen);
void * rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock
// These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
#ifdef __cplusplus
}

View File

@ -17,112 +17,6 @@
#include "mndSync.h"
#include "mndTrans.h"
static int32_t mndInitWal(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {
.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC,
};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) return -1;
return 0;
}
static void mndCloseWal(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
pMgmt->pWal = NULL;
}
}
static int32_t mndRestoreWal(SMnode *pMnode) {
// do nothing
return 0;
#if 0
SWal *pWal = pMnode->syncMgmt.pWal;
SSdb *pSdb = pMnode->pSdb;
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
int32_t code = -1;
SWalReadHandle *pHandle = walOpenReadHandle(pWal);
if (pHandle == NULL) return -1;
int64_t first = walGetFirstVer(pWal);
int64_t last = walGetLastVer(pWal);
mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
first = TMAX(lastSdbVer + 1, first);
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
if (walReadWithHandle(pHandle, ver) < 0) {
mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
goto _OVER;
}
SWalHead *pHead = pHandle->pHead;
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
if (sdbVer + 1 != ver) {
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
goto _OVER;
}
mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
goto _OVER;
}
sdbUpdateVer(pSdb, 1);
mDebug("ver:%" PRId64 ", is restored", ver);
}
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
mndTransPullup(pMnode);
sdbVer = sdbUpdateVer(pSdb, 0);
mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
if (sdbVer != lastSdbVer) {
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
if (sdbWriteFile(pSdb) != 0) {
goto _OVER;
}
if (walCommit(pWal, sdbVer) != 0) {
goto _OVER;
}
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto _OVER;
}
if (walEndSnapshot(pWal) < 0) {
goto _OVER;
}
}
code = 0;
_OVER:
walCloseReadHandle(pHandle);
return code;
#endif
}
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
@ -131,18 +25,20 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SMnode *pMnode = pFsm->data;
SSdb *pSdb = pMnode->pSdb;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SyncIndex lastApply = sdbGetApplyIndex(pSdb);
SSdbRaw *pRaw = pMsg->pCont;
if (cbMeta.index > lastApply) {
SSnapshot snapshot = {0};
(*pFsm->FpGetSnapshot)(pFsm, &snapshot);
if (cbMeta.index > snapshot.lastApplyIndex) {
mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state));
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
sdbWriteWithoutFree(pSdb, pRaw);
sdbSetApplyIndex(pSdb, cbMeta.index);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem);
}
} else {
mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, lastApply);
mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, snapshot.lastApplyIndex);
}
}
@ -174,7 +70,20 @@ int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
if (mndInitWal(pMnode) < 0) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {
.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC,
};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) {
mError("failed to open wal since %s", terrstr());
return -1;
}
@ -205,56 +114,36 @@ int32_t mndInitSync(SMnode *pMnode) {
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_destroy(&pMgmt->syncSem);
mndCloseWal(pMnode);
if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
}
memset(pMgmt, 0, sizeof(SSyncMgmt));
}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
SWal *pWal = pMnode->syncMgmt.pWal;
SSdb *pSdb = pMnode->pSdb;
#if 0
int64_t ver = sdbUpdateVer(pSdb, 1);
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
sdbUpdateVer(pSdb, -1);
mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
return -1;
}
mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
walCommit(pWal, ver);
walFsync(pWal, true);
return 0;
#else
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
SRpcMsg rsp = {0};
rsp.code = TDMT_MND_APPLY_MSG;
rsp.contLen = sdbGetRawTotalSize(pRaw);
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
rsp.pCont = rpcMallocCont(rsp.contLen);
if (rsp.pCont == NULL) return -1;
memcpy(rsp.pCont, pRaw, rsp.contLen);
bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
mError("failed to propose raw:%p since not leader", pRaw);
return -1;
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
mError("failed to propose raw:%p since sync internal error", pRaw);
} else {
assert(0);
terrno = TSDB_CODE_APP_ERROR;
}
if (code != 0) return code;
return pMgmt->errCode;
#endif
}
bool mndIsMaster(SMnode *pMnode) {

View File

@ -17,7 +17,7 @@
#include "transComm.h"
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
@ -77,37 +77,38 @@ void rpcClose(void* arg) {
taosMemoryFree(pRpc);
return;
}
void* rpcMallocCont(int contLen) {
int size = contLen + TRANS_MSG_OVERHEAD;
char* start = (char*)taosMemoryCalloc(1, (size_t)size);
void* rpcMallocCont(int32_t contLen) {
int32_t size = contLen + TRANS_MSG_OVERHEAD;
char* start = taosMemoryCalloc(1, size);
if (start == NULL) {
tError("failed to malloc msg, size:%d", size);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} else {
tTrace("malloc mem:%p size:%d", start, size);
}
return start + sizeof(STransMsgHead);
}
void rpcFreeCont(void* cont) {
// impl
if (cont == NULL) {
return;
}
void rpcFreeCont(void* cont) {
if (cont == NULL) return;
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int contLen) {
if (ptr == NULL) {
return rpcMallocCont(contLen);
}
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
int sz = contLen + TRANS_MSG_OVERHEAD;
void* rpcReallocCont(void* ptr, int32_t contLen) {
if (ptr == NULL) return rpcMallocCont(contLen);
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
int32_t sz = contLen + TRANS_MSG_OVERHEAD;
st = taosMemoryRealloc(st, sz);
if (st == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return st + TRANS_MSG_OVERHEAD;
}
@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
assert(0);
}
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
transSendRequest(shandle, pEpSet, pMsg, NULL);
@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);