diff --git a/include/common/tglobal.h b/include/common/tglobal.h index cb4426f8a9..7df6124152 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -119,6 +119,7 @@ extern SDiskCfg tsDiskCfg[]; // udf extern bool tsStartUdfd; extern char tsUdfdResFuncs[]; +extern char tsUdfdLdLibPath[]; // schemaless extern char tsSmlChildTableName[]; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 95ee2ca2bc..1a94dcf426 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -200,10 +200,10 @@ typedef struct SSyncInfo { int32_t syncInit(); void syncCleanUp(); +bool syncIsInit(); int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); -int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); @@ -216,7 +216,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); // int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); -bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); @@ -234,6 +233,9 @@ int32_t syncEndSnapshot(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); +SSyncNode* syncNodeAcquire(int64_t rid); +void syncNodeRelease(SSyncNode* pNode); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 5e9e70aa19..e39d45b061 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -28,10 +28,6 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -// ------------------ control ------------------- -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); - int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); diff --git a/include/util/tref.h b/include/util/tref.h index c2cc54cb07..c4b2ec8fa7 100644 --- a/include/util/tref.h +++ b/include/util/tref.h @@ -25,7 +25,8 @@ extern "C" { // open a reference set, max is the mod used by hash, fp is the pointer to free resource function // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately -int32_t taosOpenRef(int32_t max, void (*fp)(void *)); +typedef void (*RefFp)(void *); +int32_t taosOpenRef(int32_t max, RefFp fp); // close the reference set, refId is the return value by taosOpenRef // return 0 if success. On error, -1 is returned, and terrno is set appropriately diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index fbb9e04a25..660730f0dc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -163,7 +163,8 @@ int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds -char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits +char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits +char tsUdfdLdLibPath[512] = ""; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -424,6 +425,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; + if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; GRANT_CFG_ADD; return 0; } @@ -722,7 +724,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); - + tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 98a24286f6..53e7b1cd26 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -481,7 +481,7 @@ int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { mInfo("vgId:%d, process sync ctrl msg", 1); - if (!syncEnvIsStart()) { + if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -518,7 +518,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -581,11 +581,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index df4b526775..6853ff5ccc 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,11 +17,44 @@ #include "mndSync.h" #include "mndTrans.h" -static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { +static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return -1; + } + + int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} + +static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return -1; + } + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -212,7 +245,7 @@ int32_t mndInitSync(SMnode *pMnode) { .msgcb = NULL, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg, - .FpEqCtrlMsg = NULL, + .FpEqCtrlMsg = mndSyncEqCtrlMsg, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index bf665fd6db..11cea8e88a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -234,7 +234,7 @@ int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); terrno = TSDB_CODE_APP_ERROR; return -1; @@ -277,7 +277,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); terrno = TSDB_CODE_APP_ERROR; return -1; @@ -370,7 +370,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (msgcb == NULL) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; return -1; } @@ -383,7 +389,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (msgcb == NULL) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; return -1; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index ea59e92e98..459ee583a4 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -117,10 +117,29 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { char dnodeIdEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0}; snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); + float numCpuCores = 4; taosGetCpuCores(&numCpuCores); snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2); - char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; + + char pathTaosdLdLib[512] = {0}; + size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); + uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); + + char udfdPathLdLib[1024] = {0}; + size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath); + strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen); + udfdPathLdLib[udfdLdLibPathLen] = ':'; + strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen); + if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) { + fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib); + } else { + fnError("can not set correct udfd LD_LIBRARY_PATH"); + } + char ldLibPathEnvItem[1024 + 32] = {0}; + snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib); + + char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, NULL}; options.env = envUdfd; int err = uv_spawn(&pData->loop, &pData->process, &options); diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 06da0eb3df..55ce1470ce 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -20,13 +20,7 @@ extern "C" { #endif -#include -#include -#include #include "syncInt.h" -#include "taosdef.h" -#include "trpc.h" -#include "ttimer.h" #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 @@ -57,12 +51,12 @@ typedef struct SSyncEnv { } SSyncEnv; -extern SSyncEnv* gSyncEnv; +SSyncEnv* syncEnv(); -int32_t syncEnvStart(); -int32_t syncEnvStop(); -int32_t syncEnvStartTimer(); -int32_t syncEnvStopTimer(); +int64_t syncNodeAdd(SSyncNode* pNode); +void syncNodeRemove(int64_t rid); +SSyncNode* syncNodeAcquire(int64_t rid); +void syncNodeRelease(SSyncNode* pNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index f4949e1016..843dee1342 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -24,6 +24,8 @@ extern "C" { #include "syncTools.h" #include "tlog.h" #include "ttimer.h" +#include "taosdef.h" +#include "ttimer.h" // clang-format off #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -255,9 +257,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, Sync SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); char* syncNodePeerState2Str(const SSyncNode* pSyncNode); -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); - // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); @@ -302,9 +301,6 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); -void syncStartNormal(int64_t rid); -void syncStartStandBy(int64_t rid); - bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index fb0bfb8bef..c55cd4fdac 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -13,118 +13,111 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncEnv.h" -// #include +#include "tref.h" -SSyncEnv *gSyncEnv = NULL; +static SSyncEnv gSyncEnv = {0}; +static int32_t gNodeRefId = -1; +bool gRaftDetailLog = false; +static void syncEnvTick(void *param, void *tmrId); -// local function ----------------- -static SSyncEnv *doSyncEnvStart(); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv); -static void syncEnvTick(void *param, void *tmrId); -// -------------------------------- +SSyncEnv *syncEnv() { return &gSyncEnv; } -bool syncEnvIsStart() { - if (gSyncEnv == NULL) { - return false; - } +bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); } - return atomic_load_8(&(gSyncEnv->isStart)); -} +int32_t syncInit() { + if (syncIsInit()) return 0; -int32_t syncEnvStart() { - int32_t ret = 0; uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF); taosSeedRand(seed); - // gSyncEnv = doSyncEnvStart(gSyncEnv); - gSyncEnv = doSyncEnvStart(); - ASSERT(gSyncEnv != NULL); - sTrace("sync env start ok"); - return ret; -} -int32_t syncEnvStop() { - int32_t ret = doSyncEnvStop(gSyncEnv); - return ret; -} - -int32_t syncEnvStartTimer() { - int32_t ret = doSyncEnvStartTimer(gSyncEnv); - return ret; -} - -int32_t syncEnvStopTimer() { - int32_t ret = doSyncEnvStopTimer(gSyncEnv); - return ret; -} - -// local function ----------------- -static void syncEnvTick(void *param, void *tmrId) { - SSyncEnv *pSyncEnv = (SSyncEnv *)param; - if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { - ++(pSyncEnv->envTickTimerCounter); - sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 - ", envTickTimerCounter:%" PRIu64 - ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); - - // do something, tick ... - taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); - } else { - sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 - ", envTickTimerCounter:%" PRIu64 - ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); - } -} - -static SSyncEnv *doSyncEnvStart() { - SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv)); - ASSERT(pSyncEnv != NULL); - memset(pSyncEnv, 0, sizeof(SSyncEnv)); - - pSyncEnv->envTickTimerCounter = 0; - pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS; - pSyncEnv->FpEnvTickTimer = syncEnvTick; - atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0); - atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0); + memset(&gSyncEnv, 0, sizeof(SSyncEnv)); + gSyncEnv.envTickTimerCounter = 0; + gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS; + gSyncEnv.FpEnvTickTimer = syncEnvTick; + atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0); + atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0); // start tmr thread - pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + atomic_store_8(&gSyncEnv.isStart, 1); - atomic_store_8(&(pSyncEnv->isStart), 1); - return pSyncEnv; -} - -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { - ASSERT(pSyncEnv == gSyncEnv); - if (pSyncEnv != NULL) { - atomic_store_8(&(pSyncEnv->isStart), 0); - taosTmrCleanUp(pSyncEnv->pTimerManager); - taosMemoryFree(pSyncEnv); + gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose); + if (gNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + return -1; } - gSyncEnv = NULL; + + sDebug("sync rsetId:%d is open", gNodeRefId); return 0; } -static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { - int32_t ret = 0; - taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, - &pSyncEnv->pEnvTickTimer); - atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); - return ret; +void syncCleanUp() { + atomic_store_8(&gSyncEnv.isStart, 0); + taosTmrCleanUp(gSyncEnv.pTimerManager); + memset(&gSyncEnv, 0, sizeof(SSyncEnv)); + + if (gNodeRefId != -1) { + sDebug("sync rsetId:%d is closed", gNodeRefId); + taosCloseRef(gNodeRefId); + gNodeRefId = -1; + } } -static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { +int64_t syncNodeAdd(SSyncNode *pNode) { + pNode->rid = taosAddRef(gNodeRefId, pNode); + if (pNode->rid < 0) return -1; + + sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); + return pNode->rid; +} + +void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); } + +SSyncNode *syncNodeAcquire(int64_t rid) { + SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); + if (pNode == NULL) { + sTrace("failed to acquire node from refId:%" PRId64, rid); + } + + return pNode; +} + +void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); } + +#if 0 +void syncEnvStartTimer() { + taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager, + &gSyncEnv.pEnvTickTimer); + atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser); +} + +void syncEnvStopTimer() { int32_t ret = 0; - atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); - taosTmrStop(pSyncEnv->pEnvTickTimer); - pSyncEnv->pEnvTickTimer = NULL; + atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1); + taosTmrStop(gSyncEnv.pEnvTickTimer); + gSyncEnv.pEnvTickTimer = NULL; return ret; } +#endif + +static void syncEnvTick(void *param, void *tmrId) { + SSyncEnv *pSyncEnv = param; + if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) { + gSyncEnv.envTickTimerCounter++; + sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p", + gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, + gSyncEnv.envTickTimerMS, tmrId); + + // do something, tick ... + taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer); + } else { + sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p", + gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, + gSyncEnv.envTickTimerMS, tmrId); + } +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 59f39a697d..139e4be1b9 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -33,11 +33,6 @@ #include "syncTimeout.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tref.h" - -bool gRaftDetailLog = false; - -static int32_t tsNodeRefId = -1; // ------ local funciton --------- // enqueue message ---- @@ -53,138 +48,36 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNew int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); -// --------------------------------- -static void syncNodeFreeCb(void* param) { - syncNodeClose(param); - param = NULL; -} - -int32_t syncInit() { - int32_t ret = 0; - - if (!syncEnvIsStart()) { - tsNodeRefId = taosOpenRef(200, syncNodeFreeCb); - if (tsNodeRefId < 0) { - sError("failed to init node ref"); - syncCleanUp(); - ret = -1; - } else { - sDebug("sync rsetId:%d is open", tsNodeRefId); - ret = syncEnvStart(); - } - } - - return ret; -} - -void syncCleanUp() { - int32_t ret = syncEnvStop(); - ASSERT(ret == 0); - - if (tsNodeRefId != -1) { - sDebug("sync rsetId:%d is closed", tsNodeRefId); - taosCloseRef(tsNodeRefId); - tsNodeRefId = -1; - } -} - int64_t syncOpen(SSyncInfo* pSyncInfo) { - SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - if (pSyncNode == NULL) { + SSyncNode* pNode = syncNodeOpen(pSyncInfo); + if (pNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; } - pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); - if (pSyncNode->rid < 0) { - syncNodeClose(pSyncNode); - pSyncNode = NULL; + pNode->rid = syncNodeAdd(pNode); + if (pNode->rid < 0) { + syncNodeClose(pNode); return -1; } - sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId); - return pSyncNode->rid; + return pNode->rid; } void syncStart(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode != NULL) { + syncNodeStart(pNode); + syncNodeRelease(pNode); } - - if (pSyncNode->pRaftCfg->isStandBy) { - syncNodeStartStandBy(pSyncNode); - } else { - syncNodeStart(pSyncNode); - } - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncStartNormal(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; - } - syncNodeStart(pSyncNode); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncStartStandBy(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; - } - syncNodeStartStandBy(pSyncNode); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void syncStop(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) return; - int32_t vgId = pSyncNode->vgId; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - - taosRemoveRef(tsNodeRefId, rid); - sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", vgId, rid, tsNodeRefId); -} - -int32_t syncSetStandby(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("failed to set standby since accquire ref error, rid:%" PRId64, rid); - return -1; + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode != NULL) { + syncNodeRelease(pNode); + syncNodeRemove(rid); } - - if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_IS_LEADER; - } else { - terrno = TSDB_CODE_SYN_STANDBY_NOT_READY; - } - sError("failed to set standby since it is not follower, state:%s rid:%" PRId64, syncStr(pSyncNode->state), rid); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return -1; - } - - // state change - pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - syncNodeStopHeartbeatTimer(pSyncNode); - - // reset elect timer, long enough - int32_t electMS = TIMER_MAX_MS; - int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); - ASSERT(ret == 0); - - pSyncNode->pRaftCfg->isStandBy = 1; - raftCfgPersist(pSyncNode->pRaftCfg); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - sInfo("vgId:%d, set to standby", pSyncNode->vgId); - return 0; } bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { @@ -205,7 +98,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { } int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -214,7 +107,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t ret = 0; if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; @@ -228,12 +121,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); taosMemoryFree(newconfig); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -241,7 +134,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { ASSERT(rid == pSyncNode->rid); if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; @@ -260,7 +153,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { taosMemoryFree(newconfig); ret = syncNodePropose(pSyncNode, &rpcMsg, false); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; #else syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); @@ -276,13 +169,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { syncNodeReplicate(pSyncNode); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; #endif } int32_t syncLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -290,12 +183,12 @@ int32_t syncLeaderTransfer(int64_t rid) { ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodeLeaderTransfer(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -303,7 +196,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } @@ -359,7 +252,7 @@ char* syncNodePeerState2Str(const SSyncNode* pSyncNode) { } int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -383,7 +276,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { logNum, isEmpty); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -412,7 +305,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeEventLog(pSyncNode, logBuf); } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } } @@ -425,7 +318,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { lastApplyIndex, pSyncNode->minMatchIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -434,7 +327,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } else { @@ -443,7 +336,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -492,12 +385,12 @@ _DEL_WAL: } } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return code; } int32_t syncEndSnapshot(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -509,9 +402,9 @@ int32_t syncEndSnapshot(int64_t rid) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); + sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr()); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return -1; } else { do { @@ -525,12 +418,12 @@ int32_t syncEndSnapshot(int64_t rid) { } } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return code; } int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -539,7 +432,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { syncNodeStepDown(pSyncNode, newTerm); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -584,19 +477,19 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { } bool syncCanLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); if (pSyncNode->replicaNum == 1) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return false; } if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return true; } @@ -611,7 +504,7 @@ bool syncCanLeaderTransfer(int64_t rid) { } } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return matchOK; } @@ -621,25 +514,25 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) { } ESyncState syncGetMyRole(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); ESyncState state = pSyncNode->state; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return state; } bool syncIsReady(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); // if false, set error code if (false == b) { @@ -653,14 +546,14 @@ bool syncIsReady(int64_t rid) { } bool syncIsRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool b = pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return b; } @@ -669,7 +562,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho return -1; } - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -681,7 +574,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho if (pEntry != NULL) { syncEntryDestory(pEntry); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return -1; } ASSERT(pEntry != NULL); @@ -692,12 +585,12 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); syncEntryDestory(pEntry); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -706,12 +599,12 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -730,7 +623,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex, sMeta->lastConfigIndex); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -756,67 +649,67 @@ const char* syncGetMyRoleStr(int64_t rid) { } bool syncRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool restoreFinish = pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return restoreFinish; } SyncTerm syncGetMyTerm(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); SyncTerm term = pSyncNode->pRaftStore->currentTerm; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return term; } SyncIndex syncGetLastIndex(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return SYNC_INDEX_INVALID; } ASSERT(rid == pSyncNode->rid); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return lastIndex; } SyncIndex syncGetCommitIndex(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return SYNC_INDEX_INVALID; } ASSERT(rid == pSyncNode->rid); SyncIndex cmtIndex = pSyncNode->commitIndex; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return cmtIndex; } SyncGroupId syncGetVgId(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); SyncGroupId vgId = pSyncNode->vgId; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return vgId; } void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { memset(pEpSet, 0, sizeof(*pEpSet)); return; @@ -832,11 +725,11 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { memset(pEpSet, 0, sizeof(*pEpSet)); return; @@ -855,11 +748,11 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { } sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } @@ -871,12 +764,12 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } @@ -889,12 +782,12 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) } sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid); return; @@ -902,24 +795,24 @@ void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { ASSERT(rid == pSyncNode->rid); pSyncNode->msgcb = msgcb; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } char* sync2SimpleStr(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid); return NULL; } ASSERT(rid == pSyncNode->rid); char* s = syncNode2SimpleStr(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return s; } void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } @@ -927,22 +820,22 @@ void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { pSyncNode->pingBaseLine = pingTimerMS; pSyncNode->pingTimerMS = pingTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void setElectTimerMS(int64_t rid, int32_t electTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } ASSERT(rid == pSyncNode->rid); pSyncNode->electBaseLine = electTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } @@ -950,20 +843,20 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { pSyncNode->hbBaseLine = hbTimerMS; pSyncNode->heartbeatTimerMS = hbTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - taosReleaseRef(tsNodeRefId, rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } @@ -1085,7 +978,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; - if (syncEnvIsStart()) { + if (syncIsInit()) { SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; @@ -1093,7 +986,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->logicClock = pSyncTimer->logicClock; pSyncTimer->pData = pData; - taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer); + taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } @@ -1558,8 +1451,8 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { @@ -1578,7 +1471,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; - if (syncEnvIsStart()) { + if (syncIsInit()) { pSyncNode->electTimerMS = ms; SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); @@ -1586,7 +1479,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { pElectTimer->pSyncNode = pSyncNode; pElectTimer->pData = NULL; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { @@ -1634,8 +1527,8 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { @@ -2326,17 +2219,6 @@ _END: return; } -SSyncNode* syncNodeAcquire(int64_t rid) { - SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid); - if (pNode == NULL) { - sTrace("failed to acquire node from refId:%" PRId64, rid); - } - - return pNode; -} - -void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); } - // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { @@ -2787,8 +2669,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); } else { sError("sync env is stop, syncNodeEqPingTimer"); @@ -2808,7 +2690,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { + if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); @@ -2833,9 +2715,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { #if 0 // reset timer ms - if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { + if (syncIsInit() && pSyncNode->electBaseLine > 0) { pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { sError("sync env is stop, syncNodeEqElectTimer"); @@ -2870,8 +2752,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { sError("sync env is stop, syncNodeEqHeartbeatTimer"); @@ -2931,8 +2813,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { syncHeartbeatDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("sync env is stop, syncNodeEqHeartbeatTimer"); @@ -3424,6 +3306,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // ASSERT(pEntry != NULL); if (code != 0 || pEntry == NULL) { syncNodeErrorLog(ths, "get log entry error"); + sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); continue; } } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 5ff73a6406..1c08217099 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -76,7 +76,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { SSyncLogStoreData* pData = ths->pLogStore->data; int32_t code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", ths->vgId, terrstr(terrno)); + sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr()); return -1; } else { do { diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index 862f7bd0ba..d09879a699 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -98,7 +98,7 @@ int main(int argc, char** argv) { init(); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); char walPath[128]; diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 9f1a81e7ed..fdb5cf7ac8 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -152,7 +152,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); taosRemoveDir("./wal_test"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 8461bfe9b7..191e245b1e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index a7a819e046..404ab32d58 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -22,7 +22,7 @@ int main() { logTest(); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); for (int i = 0; i < 5; ++i) { @@ -37,8 +37,6 @@ int main() { taosMsleep(5000); } - ret = syncEnvStop(); - assert(ret == 0); - + syncCleanUp(); return 0; } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 630d96054b..4a457136c2 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index d0843151f4..d654ad06fe 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); @@ -91,7 +91,7 @@ int main(int argc, char** argv) { initRaftId(pSyncNode); syncNodeClose(pSyncNode); - syncEnvStop(); + syncCleanUp(); // syncIOStop(); // taosCloseLog(); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index 99287bf7b0..f44cbb04d5 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index cd9440e3e2..fd6342aa84 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index fa09d04368..295003dff3 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 3dc8518072..50771ac476 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -179,7 +179,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); // taosRemoveDir(pWalDir); diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index d4885d0316..e2e8748697 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 77262dfc65..881a5331b1 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index b185f52f75..fee98ddd52 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -154,7 +154,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); taosRemoveDir("./wal_test"); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index b83f84e3f2..2354f0f959 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -428,6 +428,7 @@ void transDestoryExHandle(void* handle); int32_t transGetRefMgt(); int32_t transGetInstMgt(); +void transHttpEnvDestroy(); #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 92989a45f5..00854b5ee5 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -20,21 +20,52 @@ #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "transComm.h" // clang-format on #define HTTP_RECV_BUF_SIZE 1024 +typedef struct SHttpModule { + uv_loop_t* loop; + SAsyncPool* asyncPool; + TdThread thread; +} SHttpModule; + +typedef struct SHttpMsg { + queue q; + char* server; + int32_t port; + char* cont; + int32_t len; + EHttpCompFlag flag; + int8_t quit; + SHttpModule* http; + +} SHttpMsg; + typedef struct SHttpClient { - uv_connect_t conn; - uv_tcp_t tcp; - uv_write_t req; - uv_buf_t* wbuf; - char* rbuf; - char* addr; - uint16_t port; + uv_connect_t conn; + uv_tcp_t tcp; + uv_write_t req; + uv_buf_t* wbuf; + char* rbuf; + char* addr; + uint16_t port; + struct sockaddr_in dest; } SHttpClient; +static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; +static SHttpModule* thttp = NULL; +static void transHttpEnvInit(); + +static void httpHandleReq(SHttpMsg* msg); +static void httpHandleQuit(SHttpMsg* msg); +static int32_t httpSendQuit(); + +static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag); + static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { if (flag == HTTP_FLAT) { @@ -53,6 +84,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH "Content-Length: %d\n\n", server, contLen); } else { + terrno = TSDB_CODE_INVALID_CFG; return -1; } } @@ -126,88 +158,10 @@ _OVER: return code; } -static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { - taosMemoryFree(cli->wbuf); - taosMemoryFree(cli->rbuf); - taosMemoryFree(cli->addr); - taosMemoryFree(cli); -} -static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) { - SHttpClient* cli = handle->data; - destroyHttpClient(cli); -} -static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - SHttpClient* cli = handle->data; - buf->base = cli->rbuf; - buf->len = HTTP_RECV_BUF_SIZE; -} -static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { - SHttpClient* cli = handle->data; - if (nread < 0) { - uError("http-report recv error:%s", uv_err_name(nread)); - } else { - uTrace("http-report succ to recv %d bytes", (int32_t)nread); - } - if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { - uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); - } -} -static void clientSentCb(uv_write_t* req, int32_t status) { - SHttpClient* cli = req->data; - if (status != 0) { - terrno = TAOS_SYSTEM_ERROR(status); - uError("http-report failed to send data %s", uv_strerror(status)); - if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { - uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); - } - return; - } else { - uTrace("http-report succ to send data"); - } - status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); - if (status != 0) { - terrno = TAOS_SYSTEM_ERROR(status); - uError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); - if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { - uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); - } - } -} -static void clientConnCb(uv_connect_t* req, int32_t status) { - SHttpClient* cli = req->data; - if (status != 0) { - terrno = TAOS_SYSTEM_ERROR(status); - uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); - if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { - uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); - } - return; - } - status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); - if (0 != status) { - terrno = TAOS_SYSTEM_ERROR(status); - uError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); - if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { - uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); - } - } -} - static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); + tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); return -1; } char buf[128] = {0}; @@ -215,27 +169,206 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, uv_ip4_addr(buf, port, dest); return 0; } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - struct sockaddr_in dest = {0}; - if (taosBuildDstAddr(server, port, &dest) < 0) { - return -1; - } - if (flag == HTTP_GZIP) { - int32_t dstLen = taosCompressHttpRport(pCont, contLen); - if (dstLen > 0) { - contLen = dstLen; + +static void* httpThread(void* arg) { + SHttpModule* http = (SHttpModule*)arg; + setThreadName("http-cli-send-thread"); + uv_run(http->loop, UV_RUN_DEFAULT); + return NULL; +} + +static void httpDestroyMsg(SHttpMsg* msg) { + if (msg == NULL) return; + + taosMemoryFree(msg->server); + taosMemoryFree(msg->cont); + taosMemoryFree(msg); +} +static void httpAsyncCb(uv_async_t* handle) { + SAsyncItem* item = handle->data; + SHttpModule* http = item->pThrd; + + SHttpMsg *msg = NULL, *quitMsg = NULL; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (msg->quit) { + quitMsg = msg; } else { - flag = HTTP_FLAT; + httpHandleReq(msg); } } - terrno = 0; + if (quitMsg) httpHandleQuit(quitMsg); +} - char header[2048] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); +static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->wbuf[0].base); + taosMemoryFree(cli->wbuf[1].base); + taosMemoryFree(cli->wbuf); + taosMemoryFree(cli->rbuf); + taosMemoryFree(cli->addr); + taosMemoryFree(cli); +} + +static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) { + SHttpClient* cli = handle->data; + destroyHttpClient(cli); +} + +static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + SHttpClient* cli = handle->data; + buf->base = cli->rbuf; + buf->len = HTTP_RECV_BUF_SIZE; +} + +static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { + SHttpClient* cli = handle->data; + if (nread < 0) { + tError("http-report recv error:%s", uv_err_name(nread)); + } else { + tTrace("http-report succ to recv %d bytes", (int32_t)nread); + } + if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); + } +} +static void clientSentCb(uv_write_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { + tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); + } + return; + } else { + tTrace("http-report succ to send data"); + } + status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); + if (status != 0) { + tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); + } + } +} +static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { + tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); + } + return; + } + status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); + if (0 != status) { + tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); + } + } +} + +int32_t httpSendQuit() { + SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); + msg->quit = 1; + + SHttpModule* load = atomic_load_ptr(&thttp); + if (load == NULL) { + httpDestroyMsg(msg); + tError("http-report already released"); + return -1; + } else { + msg->http = load; + } + transAsyncSend(load->asyncPool, &(msg->q)); + return 0; +} + +static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag) { + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); + msg->server = strdup(server); + msg->port = port; + msg->cont = taosMemoryMalloc(contLen); + memcpy(msg->cont, pCont, contLen); + msg->len = contLen; + msg->flag = flag; + msg->quit = 0; + + SHttpModule* load = atomic_load_ptr(&thttp); + if (load == NULL) { + httpDestroyMsg(msg); + tError("http-report already released"); + return -1; + } else { + msg->http = load; + transAsyncSend(load->asyncPool, &(msg->q)); + } + + return 0; +} + +static void httpDestroyClientCb(uv_handle_t* handle) { + SHttpClient* http = handle->data; + destroyHttpClient(http); +} +static void httpWalkCb(uv_handle_t* handle, void* arg) { + // impl later + if (!uv_is_closing(handle)) { + uv_handle_type type = uv_handle_get_type(handle); + if (uv_handle_get_type(handle) == UV_TCP) { + uv_close(handle, httpDestroyClientCb); + } else { + uv_close(handle, NULL); + } + } + return; +} +static void httpHandleQuit(SHttpMsg* msg) { + SHttpModule* http = msg->http; + taosMemoryFree(msg); + + uv_walk(http->loop, httpWalkCb, NULL); +} +static void httpHandleReq(SHttpMsg* msg) { + SHttpModule* http = msg->http; + + struct sockaddr_in dest = {0}; + if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { + goto END; + } + if (msg->flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); + if (dstLen > 0) { + msg->len = dstLen; + } else { + msg->flag = HTTP_FLAT; + } + if (dstLen < 0) { + goto END; + } + } + + int32_t len = 2048; + char* header = taosMemoryCalloc(1, len); + int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag); + if (headLen < 0) { + taosMemoryFree(header); + goto END; + } uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); - wb[0] = uv_buf_init((char*)header, headLen); // stack var - wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var + wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); cli->conn.data = cli; @@ -243,41 +376,71 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 cli->req.data = cli; cli->wbuf = wb; cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); - cli->addr = tstrdup(server); - cli->port = port; + cli->addr = msg->server; + cli->port = msg->port; + cli->dest = dest; + + taosMemoryFree(msg); + + uv_tcp_init(http->loop, &cli->tcp); - uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); - int err = uv_loop_init(loop); - if (err != 0) { - uError("http-report failed to init uv_loop, reason: %s", uv_strerror(err)); - taosMemoryFree(loop); - terrno = TAOS_SYSTEM_ERROR(err); - destroyHttpClient(cli); - return terrno; - } - uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5); - - int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); destroyHttpClient(cli); - uv_stop(loop); - terrno = TAOS_SYSTEM_ERROR(ret); - } else { - ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); - if (ret != 0) { - uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, - cli->port); - destroyHttpClient(cli); - uv_stop(loop); - terrno = TAOS_SYSTEM_ERROR(ret); - } + return; } - uv_run(loop, UV_RUN_DEFAULT); - uv_loop_close(loop); - taosMemoryFree(loop); - return terrno; + ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); + if (ret != 0) { + tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, + cli->port); + destroyHttpClient(cli); + } + return; + +END: + tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + httpDestroyMsg(msg); +} + +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + taosThreadOnce(&transHttpInit, transHttpEnvInit); + return taosSendHttpReportImpl(server, port, pCont, contLen, flag); +} + +static void transHttpEnvInit() { + SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + uv_loop_init(http->loop); + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + taosMemoryFree(http->loop); + taosMemoryFree(http); + http = NULL; + } + atomic_store_ptr(&thttp, http); +} + +void transHttpEnvDestroy() { + SHttpModule* load = atomic_load_ptr(&thttp); + if (load == NULL) { + return; + } + httpSendQuit(); + taosThreadJoin(load->thread, NULL); + + TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg); + transAsyncPoolDestroy(load->asyncPool); + uv_loop_close(load->loop); + taosMemoryFree(load->loop); + taosMemoryFree(load); + + atomic_store_ptr(&thttp, NULL); } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 756a8ff2cf..d3db4879d1 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -172,6 +172,8 @@ int32_t rpcInit() { } void rpcCleanup(void) { transCleanup(); + transHttpEnvDestroy(); + return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 28660934f8..97915e1ded 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -187,18 +187,8 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ } while (0) -#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \ - do { \ - if (exh == NULL) { \ - idx = -1; \ - } else { \ - ASYNC_CHECK_HANDLE((exh), refId); \ - pThrd = (SCliThrd*)(exh)->pThrd; \ - } \ - } while (0) -#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) -#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -217,6 +207,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) + #define CONN_GET_NEXT_SENDMSG(conn) \ do { \ int i = 0; \ @@ -231,21 +222,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ } while (0) -#define CONN_HANDLE_THREAD_QUIT(thrd) \ - do { \ - if (thrd->quit) { \ - return; \ - } \ - } while (0) - -#define CONN_HANDLE_BROKEN(conn) \ - do { \ - if (conn->broken) { \ - cliHandleExcept(conn); \ - return; \ - } \ - } while (0) - #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ if (conn->status == ConnNormal) { \ diff --git a/source/util/src/tref.c b/source/util/src/tref.c index c984ef3f34..aa741b909a 100644 --- a/source/util/src/tref.c +++ b/source/util/src/tref.c @@ -57,7 +57,7 @@ static void taosIncRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); -int32_t taosOpenRef(int32_t max, void (*fp)(void *)) { +int32_t taosOpenRef(int32_t max, RefFp fp) { SRefNode **nodeList; SRefSet *pSet; int64_t *lockedBy;