diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fcb00ddf01..754a203471 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -28,7 +28,7 @@ extern "C" { #define TAOS_CONN_CLIENT 1 #define IsReq(pMsg) (pMsg->msgType & 1U) -extern int tsRpcHeadSize; +extern int32_t tsRpcHeadSize; typedef struct { uint32_t clientIp; @@ -69,10 +69,10 @@ typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; uint16_t localPort; // local port char * label; // for debug purpose - int numOfThreads; // number of threads to handle connections - int sessions; // number of sessions allowed + int32_t numOfThreads; // number of threads to handle connections + int32_t sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS - int idleTime; // milliseconds, 0 means idle timer is disabled + int32_t idleTime; // milliseconds, 0 means idle timer is disabled // the following is for client app ecurity only char *user; // user name @@ -108,9 +108,9 @@ int32_t rpcInit(); void rpcCleanup(); void * rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); -void * rpcMallocCont(int contLen); +void * rpcMallocCont(int32_t contLen); void rpcFreeCont(void *pCont); -void * rpcReallocCont(void *ptr, int contLen); +void * rpcReallocCont(void *ptr, int32_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side @@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg); void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock // These functions will not be called in the child process -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index df7949f6a5..825b52dd9b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,112 +17,6 @@ #include "mndSync.h" #include "mndTrans.h" -static int32_t mndInitWal(SMnode *pMnode) { - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); - SWalCfg cfg = { - .vgId = 1, - .fsyncPeriod = 0, - .rollPeriod = -1, - .segSize = -1, - .retentionPeriod = -1, - .retentionSize = -1, - .level = TAOS_WAL_FSYNC, - }; - pMgmt->pWal = walOpen(path, &cfg); - if (pMgmt->pWal == NULL) return -1; - - return 0; -} - -static void mndCloseWal(SMnode *pMnode) { - SSyncMgmt *pMgmt = &pMnode->syncMgmt; - if (pMgmt->pWal != NULL) { - walClose(pMgmt->pWal); - pMgmt->pWal = NULL; - } -} - -static int32_t mndRestoreWal(SMnode *pMnode) { - // do nothing - return 0; - -#if 0 - - SWal *pWal = pMnode->syncMgmt.pWal; - SSdb *pSdb = pMnode->pSdb; - int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); - int32_t code = -1; - - SWalReadHandle *pHandle = walOpenReadHandle(pWal); - if (pHandle == NULL) return -1; - - int64_t first = walGetFirstVer(pWal); - int64_t last = walGetLastVer(pWal); - mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last); - - first = TMAX(lastSdbVer + 1, first); - for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) { - if (walReadWithHandle(pHandle, ver) < 0) { - mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr()); - goto _OVER; - } - - SWalHead *pHead = pHandle->pHead; - int64_t sdbVer = sdbUpdateVer(pSdb, 0); - if (sdbVer + 1 != ver) { - terrno = TSDB_CODE_SDB_INVALID_WAl_VER; - mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer); - goto _OVER; - } - - mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body); - if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) { - mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr()); - goto _OVER; - } - - sdbUpdateVer(pSdb, 1); - mDebug("ver:%" PRId64 ", is restored", ver); - } - - int64_t sdbVer = sdbUpdateVer(pSdb, 0); - mDebug("restore wal finished, sdbver:%" PRId64, sdbVer); - - mndTransPullup(pMnode); - sdbVer = sdbUpdateVer(pSdb, 0); - mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer); - - if (sdbVer != lastSdbVer) { - mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer); - if (sdbWriteFile(pSdb) != 0) { - goto _OVER; - } - - if (walCommit(pWal, sdbVer) != 0) { - goto _OVER; - } - - if (walBeginSnapshot(pWal, sdbVer) < 0) { - goto _OVER; - } - - if (walEndSnapshot(pWal) < 0) { - goto _OVER; - } - } - - code = 0; - -_OVER: - walCloseReadHandle(pHandle); - return code; - -#endif -} - int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } @@ -131,18 +25,20 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSdb *pSdb = pMnode->pSdb; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SyncIndex lastApply = sdbGetApplyIndex(pSdb); SSdbRaw *pRaw = pMsg->pCont; - if (cbMeta.index > lastApply) { + SSnapshot snapshot = {0}; + (*pFsm->FpGetSnapshot)(pFsm, &snapshot); + + if (cbMeta.index > snapshot.lastApplyIndex) { mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); - sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); + sdbWriteWithoutFree(pSdb, pRaw); + sdbSetApplyIndex(pSdb, cbMeta.index); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { tsem_post(&pMgmt->syncSem); } } else { - mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, lastApply); + mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, snapshot.lastApplyIndex); } } @@ -174,7 +70,20 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; tsem_init(&pMgmt->syncSem, 0, 0); - if (mndInitWal(pMnode) < 0) { + char path[PATH_MAX + 20] = {0}; + snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); + SWalCfg cfg = { + .vgId = 1, + .fsyncPeriod = 0, + .rollPeriod = -1, + .segSize = -1, + .retentionPeriod = -1, + .retentionSize = -1, + .level = TAOS_WAL_FSYNC, + }; + + pMgmt->pWal = walOpen(path, &cfg); + if (pMgmt->pWal == NULL) { mError("failed to open wal since %s", terrstr()); return -1; } @@ -205,56 +114,36 @@ int32_t mndInitSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; tsem_destroy(&pMgmt->syncSem); - mndCloseWal(pMnode); + if (pMgmt->pWal != NULL) { + walClose(pMgmt->pWal); + } + + memset(pMgmt, 0, sizeof(SSyncMgmt)); } int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { - SWal *pWal = pMnode->syncMgmt.pWal; - SSdb *pSdb = pMnode->pSdb; - -#if 0 - int64_t ver = sdbUpdateVer(pSdb, 1); - if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { - sdbUpdateVer(pSdb, -1); - mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr()); - return -1; - } - - mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw); - walCommit(pWal, ver); - walFsync(pWal, true); - - return 0; - -#else - SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - SRpcMsg rsp = {0}; - rsp.code = TDMT_MND_APPLY_MSG; - rsp.contLen = sdbGetRawTotalSize(pRaw); + SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; rsp.pCont = rpcMallocCont(rsp.contLen); + if (rsp.pCont == NULL) return -1; memcpy(rsp.pCont, pRaw, rsp.contLen); - bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); + const bool isWeak = false; + int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { terrno = TSDB_CODE_APP_NOT_READY; - mError("failed to propose raw:%p since not leader", pRaw); - return -1; } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - mError("failed to propose raw:%p since sync internal error", pRaw); } else { - assert(0); + terrno = TSDB_CODE_APP_ERROR; } if (code != 0) return code; return pMgmt->errCode; -#endif } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 5627dbfbf5..9e71c87fa5 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -17,7 +17,7 @@ #include "transComm.h" -void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { +void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = { transInitServer, transInitClient}; void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; @@ -77,37 +77,38 @@ void rpcClose(void* arg) { taosMemoryFree(pRpc); return; } -void* rpcMallocCont(int contLen) { - int size = contLen + TRANS_MSG_OVERHEAD; - char* start = (char*)taosMemoryCalloc(1, (size_t)size); +void* rpcMallocCont(int32_t contLen) { + int32_t size = contLen + TRANS_MSG_OVERHEAD; + char* start = taosMemoryCalloc(1, size); if (start == NULL) { tError("failed to malloc msg, size:%d", size); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } else { tTrace("malloc mem:%p size:%d", start, size); } + return start + sizeof(STransMsgHead); } -void rpcFreeCont(void* cont) { - // impl - if (cont == NULL) { - return; - } +void rpcFreeCont(void* cont) { + if (cont == NULL) return; taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD); tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD); } -void* rpcReallocCont(void* ptr, int contLen) { - if (ptr == NULL) { - return rpcMallocCont(contLen); - } - char* st = (char*)ptr - TRANS_MSG_OVERHEAD; - int sz = contLen + TRANS_MSG_OVERHEAD; + +void* rpcReallocCont(void* ptr, int32_t contLen) { + if (ptr == NULL) return rpcMallocCont(contLen); + + char* st = (char*)ptr - TRANS_MSG_OVERHEAD; + int32_t sz = contLen + TRANS_MSG_OVERHEAD; st = taosMemoryRealloc(st, sz); if (st == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + return st + TRANS_MSG_OVERHEAD; } @@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { assert(0); } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } +int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { transSendRequest(shandle, pEpSet, pMsg, NULL); @@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { transSendRecv(shandle, pEpSet, pMsg, pRsp); } -void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } +void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } +int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);