Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

This commit is contained in:
Minghao Li 2022-11-01 10:47:42 +08:00
commit 094c14e272
32 changed files with 599 additions and 532 deletions

View File

@ -119,6 +119,7 @@ extern SDiskCfg tsDiskCfg[];
// udf // udf
extern bool tsStartUdfd; extern bool tsStartUdfd;
extern char tsUdfdResFuncs[]; extern char tsUdfdResFuncs[];
extern char tsUdfdLdLibPath[];
// schemaless // schemaless
extern char tsSmlChildTableName[]; extern char tsSmlChildTableName[];

View File

@ -200,10 +200,10 @@ typedef struct SSyncInfo {
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
bool syncIsInit();
int64_t syncOpen(SSyncInfo* pSyncInfo); int64_t syncOpen(SSyncInfo* pSyncInfo);
void syncStart(int64_t rid); void syncStart(int64_t rid);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid); bool syncIsReady(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
@ -216,7 +216,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); // int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
@ -234,6 +233,9 @@ int32_t syncEndSnapshot(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm); int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -28,10 +28,6 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
// ------------------ control -------------------
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);

View File

@ -25,7 +25,8 @@ extern "C" {
// open a reference set, max is the mod used by hash, fp is the pointer to free resource function // open a reference set, max is the mod used by hash, fp is the pointer to free resource function
// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int32_t taosOpenRef(int32_t max, void (*fp)(void *)); typedef void (*RefFp)(void *);
int32_t taosOpenRef(int32_t max, RefFp fp);
// close the reference set, refId is the return value by taosOpenRef // close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately

View File

@ -163,7 +163,8 @@ int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400; int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
char tsUdfdLdLibPath[512] = "";
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
@ -424,6 +425,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
GRANT_CFG_ADD; GRANT_CFG_ADD;
return 0; return 0;
} }
@ -722,7 +724,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }

View File

@ -481,7 +481,7 @@ int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) {
mInfo("vgId:%d, process sync ctrl msg", 1); mInfo("vgId:%d, process sync ctrl msg", 1);
if (!syncEnvIsStart()) { if (!syncIsInit()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -518,7 +518,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = 0; int32_t code = 0;
if (!syncEnvIsStart()) { if (!syncIsInit()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -581,11 +581,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);

View File

@ -17,11 +17,44 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) { if (code != 0) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
@ -212,7 +245,7 @@ int32_t mndInitSync(SMnode *pMnode) {
.msgcb = NULL, .msgcb = NULL,
.FpSendMsg = mndSyncSendMsg, .FpSendMsg = mndSyncSendMsg,
.FpEqMsg = mndSyncEqMsg, .FpEqMsg = mndSyncEqMsg,
.FpEqCtrlMsg = NULL, .FpEqCtrlMsg = mndSyncEqCtrlMsg,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);

View File

@ -234,7 +234,7 @@ int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) { if (!syncIsInit()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
@ -277,7 +277,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
if (!syncEnvIsStart()) { if (!syncIsInit()) {
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return -1; return -1;
@ -370,7 +370,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1; return -1;
} }
@ -383,7 +389,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
} }
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1; return -1;
} }

View File

@ -117,10 +117,29 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char dnodeIdEnvItem[32] = {0}; char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
float numCpuCores = 4; float numCpuCores = 4;
taosGetCpuCores(&numCpuCores); taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2); snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2);
char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
char udfdPathLdLib[1024] = {0};
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen);
udfdPathLdLib[udfdLdLibPathLen] = ':';
strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen);
if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
} else {
fnError("can not set correct udfd LD_LIBRARY_PATH");
}
char ldLibPathEnvItem[1024 + 32] = {0};
snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, NULL};
options.env = envUdfd; options.env = envUdfd;
int err = uv_spawn(&pData->loop, &pData->process, &options); int err = uv_spawn(&pData->loop, &pData->process, &options);

View File

@ -20,13 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h"
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
@ -57,12 +51,12 @@ typedef struct SSyncEnv {
} SSyncEnv; } SSyncEnv;
extern SSyncEnv* gSyncEnv; SSyncEnv* syncEnv();
int32_t syncEnvStart(); int64_t syncNodeAdd(SSyncNode* pNode);
int32_t syncEnvStop(); void syncNodeRemove(int64_t rid);
int32_t syncEnvStartTimer(); SSyncNode* syncNodeAcquire(int64_t rid);
int32_t syncEnvStopTimer(); void syncNodeRelease(SSyncNode* pNode);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -24,6 +24,8 @@ extern "C" {
#include "syncTools.h" #include "syncTools.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
#include "taosdef.h"
#include "ttimer.h"
// clang-format off // clang-format off
#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
@ -255,9 +257,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, Sync
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
char* syncNodePeerState2Str(const SSyncNode* pSyncNode); char* syncNodePeerState2Str(const SSyncNode* pSyncNode);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
@ -302,9 +301,6 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);
bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCanChange(SSyncNode* pSyncNode);
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);

View File

@ -13,118 +13,111 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "syncEnv.h" #include "syncEnv.h"
// #include <ASSERT.h> #include "tref.h"
SSyncEnv *gSyncEnv = NULL; static SSyncEnv gSyncEnv = {0};
static int32_t gNodeRefId = -1;
// local function ----------------- bool gRaftDetailLog = false;
static SSyncEnv *doSyncEnvStart();
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv);
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv);
static void syncEnvTick(void *param, void *tmrId); static void syncEnvTick(void *param, void *tmrId);
// --------------------------------
bool syncEnvIsStart() { SSyncEnv *syncEnv() { return &gSyncEnv; }
if (gSyncEnv == NULL) {
return false;
}
return atomic_load_8(&(gSyncEnv->isStart)); bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); }
}
int32_t syncInit() {
if (syncIsInit()) return 0;
int32_t syncEnvStart() {
int32_t ret = 0;
uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF); uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
taosSeedRand(seed); taosSeedRand(seed);
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart();
ASSERT(gSyncEnv != NULL);
sTrace("sync env start ok");
return ret;
}
int32_t syncEnvStop() { memset(&gSyncEnv, 0, sizeof(SSyncEnv));
int32_t ret = doSyncEnvStop(gSyncEnv); gSyncEnv.envTickTimerCounter = 0;
return ret; gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS;
} gSyncEnv.FpEnvTickTimer = syncEnvTick;
atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0);
int32_t syncEnvStartTimer() { atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0);
int32_t ret = doSyncEnvStartTimer(gSyncEnv);
return ret;
}
int32_t syncEnvStopTimer() {
int32_t ret = doSyncEnvStopTimer(gSyncEnv);
return ret;
}
// local function -----------------
static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = (SSyncEnv *)param;
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
++(pSyncEnv->envTickTimerCounter);
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64
", "
"envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
// do something, tick ...
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
} else {
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64
", "
"envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
}
}
static SSyncEnv *doSyncEnvStart() {
SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv));
ASSERT(pSyncEnv != NULL);
memset(pSyncEnv, 0, sizeof(SSyncEnv));
pSyncEnv->envTickTimerCounter = 0;
pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS;
pSyncEnv->FpEnvTickTimer = syncEnvTick;
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0);
atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0);
// start tmr thread // start tmr thread
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
atomic_store_8(&gSyncEnv.isStart, 1);
atomic_store_8(&(pSyncEnv->isStart), 1); gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
return pSyncEnv; if (gNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
return -1;
} }
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { sDebug("sync rsetId:%d is open", gNodeRefId);
ASSERT(pSyncEnv == gSyncEnv);
if (pSyncEnv != NULL) {
atomic_store_8(&(pSyncEnv->isStart), 0);
taosTmrCleanUp(pSyncEnv->pTimerManager);
taosMemoryFree(pSyncEnv);
}
gSyncEnv = NULL;
return 0; return 0;
} }
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { void syncCleanUp() {
int32_t ret = 0; atomic_store_8(&gSyncEnv.isStart, 0);
taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, taosTmrCleanUp(gSyncEnv.pTimerManager);
&pSyncEnv->pEnvTickTimer); memset(&gSyncEnv, 0, sizeof(SSyncEnv));
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser);
return ret; if (gNodeRefId != -1) {
sDebug("sync rsetId:%d is closed", gNodeRefId);
taosCloseRef(gNodeRefId);
gNodeRefId = -1;
}
} }
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { int64_t syncNodeAdd(SSyncNode *pNode) {
pNode->rid = taosAddRef(gNodeRefId, pNode);
if (pNode->rid < 0) return -1;
sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
return pNode->rid;
}
void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); }
SSyncNode *syncNodeAcquire(int64_t rid) {
SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
if (pNode == NULL) {
sTrace("failed to acquire node from refId:%" PRId64, rid);
}
return pNode;
}
void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); }
#if 0
void syncEnvStartTimer() {
taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager,
&gSyncEnv.pEnvTickTimer);
atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser);
}
void syncEnvStopTimer() {
int32_t ret = 0; int32_t ret = 0;
atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1);
taosTmrStop(pSyncEnv->pEnvTickTimer); taosTmrStop(gSyncEnv.pEnvTickTimer);
pSyncEnv->pEnvTickTimer = NULL; gSyncEnv.pEnvTickTimer = NULL;
return ret; return ret;
} }
#endif
static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = param;
if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
gSyncEnv.envTickTimerCounter++;
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
// do something, tick ...
taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer);
} else {
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
}
}

View File

@ -33,11 +33,6 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tref.h"
bool gRaftDetailLog = false;
static int32_t tsNodeRefId = -1;
// ------ local funciton --------- // ------ local funciton ---------
// enqueue message ---- // enqueue message ----
@ -53,138 +48,36 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNew
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
// ---------------------------------
static void syncNodeFreeCb(void* param) {
syncNodeClose(param);
param = NULL;
}
int32_t syncInit() {
int32_t ret = 0;
if (!syncEnvIsStart()) {
tsNodeRefId = taosOpenRef(200, syncNodeFreeCb);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
ret = -1;
} else {
sDebug("sync rsetId:%d is open", tsNodeRefId);
ret = syncEnvStart();
}
}
return ret;
}
void syncCleanUp() {
int32_t ret = syncEnvStop();
ASSERT(ret == 0);
if (tsNodeRefId != -1) {
sDebug("sync rsetId:%d is closed", tsNodeRefId);
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
}
int64_t syncOpen(SSyncInfo* pSyncInfo) { int64_t syncOpen(SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pNode = syncNodeOpen(pSyncInfo);
if (pSyncNode == NULL) { if (pNode == NULL) {
sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
return -1; return -1;
} }
pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); pNode->rid = syncNodeAdd(pNode);
if (pSyncNode->rid < 0) { if (pNode->rid < 0) {
syncNodeClose(pSyncNode); syncNodeClose(pNode);
pSyncNode = NULL;
return -1; return -1;
} }
sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId); return pNode->rid;
return pSyncNode->rid;
} }
void syncStart(int64_t rid) { void syncStart(int64_t rid) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pNode != NULL) {
return; syncNodeStart(pNode);
syncNodeRelease(pNode);
} }
if (pSyncNode->pRaftCfg->isStandBy) {
syncNodeStartStandBy(pSyncNode);
} else {
syncNodeStart(pSyncNode);
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
void syncStartNormal(int64_t rid) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return;
}
syncNodeStart(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
void syncStartStandBy(int64_t rid) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return;
}
syncNodeStartStandBy(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
} }
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return; if (pNode != NULL) {
int32_t vgId = pSyncNode->vgId; syncNodeRelease(pNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRemove(rid);
taosRemoveRef(tsNodeRefId, rid);
sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", vgId, rid, tsNodeRefId);
} }
int32_t syncSetStandby(int64_t rid) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("failed to set standby since accquire ref error, rid:%" PRId64, rid);
return -1;
}
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_IS_LEADER;
} else {
terrno = TSDB_CODE_SYN_STANDBY_NOT_READY;
}
sError("failed to set standby since it is not follower, state:%s rid:%" PRId64, syncStr(pSyncNode->state), rid);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1;
}
// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer, long enough
int32_t electMS = TIMER_MAX_MS;
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
ASSERT(ret == 0);
pSyncNode->pRaftCfg->isStandBy = 1;
raftCfgPersist(pSyncNode->pRaftCfg);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
sInfo("vgId:%d, set to standby", pSyncNode->vgId);
return 0;
} }
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) {
@ -205,7 +98,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) {
} }
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -214,7 +107,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t ret = 0; int32_t ret = 0;
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("invalid new config. vgId:%d", pSyncNode->vgId); sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1; return -1;
@ -228,12 +121,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
taosMemoryFree(newconfig); taosMemoryFree(newconfig);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -241,7 +134,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("invalid new config. vgId:%d", pSyncNode->vgId); sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1; return -1;
@ -260,7 +153,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
taosMemoryFree(newconfig); taosMemoryFree(newconfig);
ret = syncNodePropose(pSyncNode, &rpcMsg, false); ret = syncNodePropose(pSyncNode, &rpcMsg, false);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
#else #else
syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
@ -276,13 +169,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
syncNodeReplicate(pSyncNode); syncNodeReplicate(pSyncNode);
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
#endif #endif
} }
int32_t syncLeaderTransfer(int64_t rid) { int32_t syncLeaderTransfer(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -290,12 +183,12 @@ int32_t syncLeaderTransfer(int64_t rid) {
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = syncNodeLeaderTransfer(pSyncNode); int32_t ret = syncNodeLeaderTransfer(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -303,7 +196,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
@ -359,7 +252,7 @@ char* syncNodePeerState2Str(const SSyncNode* pSyncNode) {
} }
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -383,7 +276,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
logNum, isEmpty); logNum, isEmpty);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
@ -412,7 +305,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} while (0); } while (0);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
} }
@ -425,7 +318,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
lastApplyIndex, pSyncNode->minMatchIndex); lastApplyIndex, pSyncNode->minMatchIndex);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
@ -434,7 +327,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex); snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} else { } else {
@ -443,7 +336,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
lastApplyIndex); lastApplyIndex);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
@ -492,12 +385,12 @@ _DEL_WAL:
} }
} while (0); } while (0);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return code; return code;
} }
int32_t syncEndSnapshot(int64_t rid) { int32_t syncEndSnapshot(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -509,9 +402,9 @@ int32_t syncEndSnapshot(int64_t rid) {
SSyncLogStoreData* pData = pSyncNode->pLogStore->data; SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walEndSnapshot(pData->pWal); code = walEndSnapshot(pData->pWal);
if (code != 0) { if (code != 0) {
sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr());
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return -1; return -1;
} else { } else {
do { do {
@ -525,12 +418,12 @@ int32_t syncEndSnapshot(int64_t rid) {
} }
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return code; return code;
} }
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -539,7 +432,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
syncNodeStepDown(pSyncNode, newTerm); syncNodeStepDown(pSyncNode, newTerm);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
@ -584,19 +477,19 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
} }
bool syncCanLeaderTransfer(int64_t rid) { bool syncCanLeaderTransfer(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return false; return false;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return false; return false;
} }
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return true; return true;
} }
@ -611,7 +504,7 @@ bool syncCanLeaderTransfer(int64_t rid) {
} }
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return matchOK; return matchOK;
} }
@ -621,25 +514,25 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
} }
ESyncState syncGetMyRole(int64_t rid) { ESyncState syncGetMyRole(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
ESyncState state = pSyncNode->state; ESyncState state = pSyncNode->state;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return state; return state;
} }
bool syncIsReady(int64_t rid) { bool syncIsReady(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return false; return false;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
// if false, set error code // if false, set error code
if (false == b) { if (false == b) {
@ -653,14 +546,14 @@ bool syncIsReady(int64_t rid) {
} }
bool syncIsRestoreFinish(int64_t rid) { bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return false; return false;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
bool b = pSyncNode->restoreFinish; bool b = pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return b; return b;
} }
@ -669,7 +562,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
return -1; return -1;
} }
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return -1; return -1;
} }
@ -681,7 +574,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
if (pEntry != NULL) { if (pEntry != NULL) {
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return -1; return -1;
} }
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
@ -692,12 +585,12 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return -1; return -1;
} }
@ -706,12 +599,12 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) { int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return -1; return -1;
} }
@ -730,7 +623,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex, sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex); sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
@ -756,67 +649,67 @@ const char* syncGetMyRoleStr(int64_t rid) {
} }
bool syncRestoreFinish(int64_t rid) { bool syncRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return false; return false;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
bool restoreFinish = pSyncNode->restoreFinish; bool restoreFinish = pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return restoreFinish; return restoreFinish;
} }
SyncTerm syncGetMyTerm(int64_t rid) { SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
SyncTerm term = pSyncNode->pRaftStore->currentTerm; SyncTerm term = pSyncNode->pRaftStore->currentTerm;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return term; return term;
} }
SyncIndex syncGetLastIndex(int64_t rid) { SyncIndex syncGetLastIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID; return SYNC_INDEX_INVALID;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return lastIndex; return lastIndex;
} }
SyncIndex syncGetCommitIndex(int64_t rid) { SyncIndex syncGetCommitIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID; return SYNC_INDEX_INVALID;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
SyncIndex cmtIndex = pSyncNode->commitIndex; SyncIndex cmtIndex = pSyncNode->commitIndex;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return cmtIndex; return cmtIndex;
} }
SyncGroupId syncGetVgId(int64_t rid) { SyncGroupId syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
SyncGroupId vgId = pSyncNode->vgId; SyncGroupId vgId = pSyncNode->vgId;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return vgId; return vgId;
} }
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
memset(pEpSet, 0, sizeof(*pEpSet)); memset(pEpSet, 0, sizeof(*pEpSet));
return; return;
@ -832,11 +725,11 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
} }
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
memset(pEpSet, 0, sizeof(*pEpSet)); memset(pEpSet, 0, sizeof(*pEpSet));
return; return;
@ -855,11 +748,11 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
} }
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
} }
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
@ -871,12 +764,12 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
@ -889,12 +782,12 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo)
} }
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid); sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid);
return; return;
@ -902,24 +795,24 @@ void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
pSyncNode->msgcb = msgcb; pSyncNode->msgcb = msgcb;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
} }
char* sync2SimpleStr(int64_t rid) { char* sync2SimpleStr(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid); sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid);
return NULL; return NULL;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
char* s = syncNode2SimpleStr(pSyncNode); char* s = syncNode2SimpleStr(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return s; return s;
} }
void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
} }
@ -927,22 +820,22 @@ void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
pSyncNode->pingBaseLine = pingTimerMS; pSyncNode->pingBaseLine = pingTimerMS;
pSyncNode->pingTimerMS = pingTimerMS; pSyncNode->pingTimerMS = pingTimerMS;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
} }
void setElectTimerMS(int64_t rid, int32_t electTimerMS) { void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
pSyncNode->electBaseLine = electTimerMS; pSyncNode->electBaseLine = electTimerMS;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
} }
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
} }
@ -950,20 +843,20 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
pSyncNode->hbBaseLine = hbTimerMS; pSyncNode->hbBaseLine = hbTimerMS;
pSyncNode->heartbeatTimerMS = hbTimerMS; pSyncNode->heartbeatTimerMS = hbTimerMS;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
} }
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
taosReleaseRef(tsNodeRefId, rid); syncNodeRelease(pSyncNode);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); syncNodeRelease(pSyncNode);
return ret; return ret;
} }
@ -1085,7 +978,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de
int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t ret = 0;
if (syncEnvIsStart()) { if (syncIsInit()) {
SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
pData->pSyncNode = pSyncNode; pData->pSyncNode = pSyncNode;
pData->pTimer = pSyncTimer; pData->pTimer = pSyncTimer;
@ -1093,7 +986,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
pData->logicClock = pSyncTimer->logicClock; pData->logicClock = pSyncTimer->logicClock;
pSyncTimer->pData = pData; pSyncTimer->pData = pData;
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer); taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
} else { } else {
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
} }
@ -1558,8 +1451,8 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
// timer control -------------- // timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncEnvIsStart()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
@ -1578,7 +1471,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0; int32_t ret = 0;
if (syncEnvIsStart()) { if (syncIsInit()) {
pSyncNode->electTimerMS = ms; pSyncNode->electTimerMS = ms;
SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
@ -1586,7 +1479,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
pElectTimer->pSyncNode = pSyncNode; pElectTimer->pSyncNode = pSyncNode;
pElectTimer->pData = NULL; pElectTimer->pData = NULL;
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
} else { } else {
@ -1634,8 +1527,8 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncEnvIsStart()) { if (syncIsInit()) {
taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pHeartbeatTimer); &pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
@ -2326,17 +2219,6 @@ _END:
return; return;
} }
SSyncNode* syncNodeAcquire(int64_t rid) {
SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid);
if (pNode == NULL) {
sTrace("failed to acquire node from refId:%" PRId64, rid);
}
return pNode;
}
void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); }
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) { if (term > pSyncNode->pRaftStore->currentTerm) {
@ -2787,8 +2669,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
if (syncEnvIsStart()) { if (syncIsInit()) {
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pPingTimer); &pSyncNode->pPingTimer);
} else { } else {
sError("sync env is stop, syncNodeEqPingTimer"); sError("sync env is stop, syncNodeEqPingTimer");
@ -2808,7 +2690,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
pSyncNode->vgId, pSyncNode); pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
@ -2833,9 +2715,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
#if 0 #if 0
// reset timer ms // reset timer ms
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { if (syncIsInit() && pSyncNode->electBaseLine > 0) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
} else { } else {
sError("sync env is stop, syncNodeEqElectTimer"); sError("sync env is stop, syncNodeEqElectTimer");
@ -2870,8 +2752,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
if (syncEnvIsStart()) { if (syncIsInit()) {
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pHeartbeatTimer); &pSyncNode->pHeartbeatTimer);
} else { } else {
sError("sync env is stop, syncNodeEqHeartbeatTimer"); sError("sync env is stop, syncNodeEqHeartbeatTimer");
@ -2931,8 +2813,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncHeartbeatDestroy(pSyncMsg); syncHeartbeatDestroy(pSyncMsg);
if (syncEnvIsStart()) { if (syncIsInit()) {
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
&pSyncTimer->pTimer); &pSyncTimer->pTimer);
} else { } else {
sError("sync env is stop, syncNodeEqHeartbeatTimer"); sError("sync env is stop, syncNodeEqHeartbeatTimer");
@ -3424,6 +3306,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// ASSERT(pEntry != NULL); // ASSERT(pEntry != NULL);
if (code != 0 || pEntry == NULL) { if (code != 0 || pEntry == NULL) {
syncNodeErrorLog(ths, "get log entry error"); syncNodeErrorLog(ths, "get log entry error");
sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
continue; continue;
} }
} }

View File

@ -76,7 +76,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
SSyncLogStoreData* pData = ths->pLogStore->data; SSyncLogStoreData* pData = ths->pLogStore->data;
int32_t code = walEndSnapshot(pData->pWal); int32_t code = walEndSnapshot(pData->pWal);
if (code != 0) { if (code != 0) {
sError("vgId:%d, wal snapshot end error since:%s", ths->vgId, terrstr(terrno)); sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr());
return -1; return -1;
} else { } else {
do { do {

View File

@ -98,7 +98,7 @@ int main(int argc, char** argv) {
init(); init();
int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
char walPath[128]; char walPath[128];

View File

@ -152,7 +152,7 @@ int main(int argc, char **argv) {
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
taosRemoveDir("./wal_test"); taosRemoveDir("./wal_test");

View File

@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -22,7 +22,7 @@ int main() {
logTest(); logTest();
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
@ -37,8 +37,6 @@ int main() {
taosMsleep(5000); taosMsleep(5000);
} }
ret = syncEnvStop(); syncCleanUp();
assert(ret == 0);
return 0; return 0;
} }

View File

@ -82,7 +82,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
@ -91,7 +91,7 @@ int main(int argc, char** argv) {
initRaftId(pSyncNode); initRaftId(pSyncNode);
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
syncEnvStop(); syncCleanUp();
// syncIOStop(); // syncIOStop();
// taosCloseLog(); // taosCloseLog();

View File

@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -81,7 +81,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -179,7 +179,7 @@ int main(int argc, char **argv) {
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
// taosRemoveDir(pWalDir); // taosRemoveDir(pWalDir);

View File

@ -82,7 +82,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -82,7 +82,7 @@ int main(int argc, char** argv) {
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();

View File

@ -154,7 +154,7 @@ int main(int argc, char **argv) {
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0); assert(ret == 0);
ret = syncEnvStart(); ret = syncInit();
assert(ret == 0); assert(ret == 0);
taosRemoveDir("./wal_test"); taosRemoveDir("./wal_test");

View File

@ -428,6 +428,7 @@ void transDestoryExHandle(void* handle);
int32_t transGetRefMgt(); int32_t transGetRefMgt();
int32_t transGetInstMgt(); int32_t transGetInstMgt();
void transHttpEnvDestroy();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -20,11 +20,30 @@
#include "thttp.h" #include "thttp.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "transComm.h"
// clang-format on // clang-format on
#define HTTP_RECV_BUF_SIZE 1024 #define HTTP_RECV_BUF_SIZE 1024
typedef struct SHttpModule {
uv_loop_t* loop;
SAsyncPool* asyncPool;
TdThread thread;
} SHttpModule;
typedef struct SHttpMsg {
queue q;
char* server;
int32_t port;
char* cont;
int32_t len;
EHttpCompFlag flag;
int8_t quit;
SHttpModule* http;
} SHttpMsg;
typedef struct SHttpClient { typedef struct SHttpClient {
uv_connect_t conn; uv_connect_t conn;
uv_tcp_t tcp; uv_tcp_t tcp;
@ -33,8 +52,20 @@ typedef struct SHttpClient {
char* rbuf; char* rbuf;
char* addr; char* addr;
uint16_t port; uint16_t port;
struct sockaddr_in dest;
} SHttpClient; } SHttpClient;
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static SHttpModule* thttp = NULL;
static void transHttpEnvInit();
static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit();
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) { EHttpCompFlag flag) {
if (flag == HTTP_FLAT) { if (flag == HTTP_FLAT) {
@ -53,6 +84,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH
"Content-Length: %d\n\n", "Content-Length: %d\n\n",
server, contLen); server, contLen);
} else { } else {
terrno = TSDB_CODE_INVALID_CFG;
return -1; return -1;
} }
} }
@ -126,88 +158,10 @@ _OVER:
return code; return code;
} }
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->wbuf);
taosMemoryFree(cli->rbuf);
taosMemoryFree(cli->addr);
taosMemoryFree(cli);
}
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data;
destroyHttpClient(cli);
}
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SHttpClient* cli = handle->data;
buf->base = cli->rbuf;
buf->len = HTTP_RECV_BUF_SIZE;
}
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SHttpClient* cli = handle->data;
if (nread < 0) {
uError("http-report recv error:%s", uv_err_name(nread));
} else {
uTrace("http-report succ to recv %d bytes", (int32_t)nread);
}
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} else {
destroyHttpClient(cli);
}
}
static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to send data %s", uv_strerror(status));
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} else {
destroyHttpClient(cli);
}
return;
} else {
uTrace("http-report succ to send data");
}
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} else {
destroyHttpClient(cli);
}
}
}
static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} else {
destroyHttpClient(cli);
}
return;
}
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
if (0 != status) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} else {
destroyHttpClient(cli);
}
}
}
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
uint32_t ip = taosGetIpv4FromFqdn(server); uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno); tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
uError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
return -1; return -1;
} }
char buf[128] = {0}; char buf[128] = {0};
@ -215,27 +169,206 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port,
uv_ip4_addr(buf, port, dest); uv_ip4_addr(buf, port, dest);
return 0; return 0;
} }
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
struct sockaddr_in dest = {0};
if (taosBuildDstAddr(server, port, &dest) < 0) {
return -1;
}
if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) {
contLen = dstLen;
} else {
flag = HTTP_FLAT;
}
}
terrno = 0;
char header[2048] = {0}; static void* httpThread(void* arg) {
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); SHttpModule* http = (SHttpModule*)arg;
setThreadName("http-cli-send-thread");
uv_run(http->loop, UV_RUN_DEFAULT);
return NULL;
}
static void httpDestroyMsg(SHttpMsg* msg) {
if (msg == NULL) return;
taosMemoryFree(msg->server);
taosMemoryFree(msg->cont);
taosMemoryFree(msg);
}
static void httpAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data;
SHttpModule* http = item->pThrd;
SHttpMsg *msg = NULL, *quitMsg = NULL;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
msg = QUEUE_DATA(h, SHttpMsg, q);
if (msg->quit) {
quitMsg = msg;
} else {
httpHandleReq(msg);
}
}
if (quitMsg) httpHandleQuit(quitMsg);
}
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->wbuf[0].base);
taosMemoryFree(cli->wbuf[1].base);
taosMemoryFree(cli->wbuf);
taosMemoryFree(cli->rbuf);
taosMemoryFree(cli->addr);
taosMemoryFree(cli);
}
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data;
destroyHttpClient(cli);
}
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SHttpClient* cli = handle->data;
buf->base = cli->rbuf;
buf->len = HTTP_RECV_BUF_SIZE;
}
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SHttpClient* cli = handle->data;
if (nread < 0) {
tError("http-report recv error:%s", uv_err_name(nread));
} else {
tTrace("http-report succ to recv %d bytes", (int32_t)nread);
}
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
}
static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
return;
} else {
tTrace("http-report succ to send data");
}
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
if (status != 0) {
tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
}
}
static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
return;
}
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
if (0 != status) {
tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
}
}
int32_t httpSendQuit() {
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
msg->quit = 1;
SHttpModule* load = atomic_load_ptr(&thttp);
if (load == NULL) {
httpDestroyMsg(msg);
tError("http-report already released");
return -1;
} else {
msg->http = load;
}
transAsyncSend(load->asyncPool, &(msg->q));
return 0;
}
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
msg->server = strdup(server);
msg->port = port;
msg->cont = taosMemoryMalloc(contLen);
memcpy(msg->cont, pCont, contLen);
msg->len = contLen;
msg->flag = flag;
msg->quit = 0;
SHttpModule* load = atomic_load_ptr(&thttp);
if (load == NULL) {
httpDestroyMsg(msg);
tError("http-report already released");
return -1;
} else {
msg->http = load;
transAsyncSend(load->asyncPool, &(msg->q));
}
return 0;
}
static void httpDestroyClientCb(uv_handle_t* handle) {
SHttpClient* http = handle->data;
destroyHttpClient(http);
}
static void httpWalkCb(uv_handle_t* handle, void* arg) {
// impl later
if (!uv_is_closing(handle)) {
uv_handle_type type = uv_handle_get_type(handle);
if (uv_handle_get_type(handle) == UV_TCP) {
uv_close(handle, httpDestroyClientCb);
} else {
uv_close(handle, NULL);
}
}
return;
}
static void httpHandleQuit(SHttpMsg* msg) {
SHttpModule* http = msg->http;
taosMemoryFree(msg);
uv_walk(http->loop, httpWalkCb, NULL);
}
static void httpHandleReq(SHttpMsg* msg) {
SHttpModule* http = msg->http;
struct sockaddr_in dest = {0};
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
goto END;
}
if (msg->flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
if (dstLen > 0) {
msg->len = dstLen;
} else {
msg->flag = HTTP_FLAT;
}
if (dstLen < 0) {
goto END;
}
}
int32_t len = 2048;
char* header = taosMemoryCalloc(1, len);
int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag);
if (headLen < 0) {
taosMemoryFree(header);
goto END;
}
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
wb[0] = uv_buf_init((char*)header, headLen); // stack var wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
wb[1] = uv_buf_init((char*)pCont, contLen); // heap var wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
cli->conn.data = cli; cli->conn.data = cli;
@ -243,41 +376,71 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
cli->req.data = cli; cli->req.data = cli;
cli->wbuf = wb; cli->wbuf = wb;
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
cli->addr = tstrdup(server); cli->addr = msg->server;
cli->port = port; cli->port = msg->port;
cli->dest = dest;
taosMemoryFree(msg);
uv_tcp_init(http->loop, &cli->tcp);
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t));
int err = uv_loop_init(loop);
if (err != 0) {
uError("http-report failed to init uv_loop, reason: %s", uv_strerror(err));
taosMemoryFree(loop);
terrno = TAOS_SYSTEM_ERROR(err);
destroyHttpClient(cli);
return terrno;
}
uv_tcp_init(loop, &cli->tcp);
// set up timeout to avoid stuck; // set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5); int32_t fd = taosCreateSocketWithTimeout(5);
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (ret != 0) { if (ret != 0) {
uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
destroyHttpClient(cli); destroyHttpClient(cli);
uv_stop(loop); return;
terrno = TAOS_SYSTEM_ERROR(ret);
} else {
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb);
if (ret != 0) {
uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
cli->port);
destroyHttpClient(cli);
uv_stop(loop);
terrno = TAOS_SYSTEM_ERROR(ret);
}
} }
uv_run(loop, UV_RUN_DEFAULT); ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
uv_loop_close(loop); if (ret != 0) {
taosMemoryFree(loop); tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
return terrno; cli->port);
destroyHttpClient(cli);
}
return;
END:
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
httpDestroyMsg(msg);
}
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
taosThreadOnce(&transHttpInit, transHttpEnvInit);
return taosSendHttpReportImpl(server, port, pCont, contLen, flag);
}
static void transHttpEnvInit() {
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(http->loop);
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) {
taosMemoryFree(http->loop);
taosMemoryFree(http);
http = NULL;
}
atomic_store_ptr(&thttp, http);
}
void transHttpEnvDestroy() {
SHttpModule* load = atomic_load_ptr(&thttp);
if (load == NULL) {
return;
}
httpSendQuit();
taosThreadJoin(load->thread, NULL);
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg);
transAsyncPoolDestroy(load->asyncPool);
uv_loop_close(load->loop);
taosMemoryFree(load->loop);
taosMemoryFree(load);
atomic_store_ptr(&thttp, NULL);
} }

View File

@ -172,6 +172,8 @@ int32_t rpcInit() {
} }
void rpcCleanup(void) { void rpcCleanup(void) {
transCleanup(); transCleanup();
transHttpEnvDestroy();
return; return;
} }

View File

@ -187,17 +187,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
} while (0) } while (0)
#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \
do { \
if (exh == NULL) { \
idx = -1; \
} else { \
ASYNC_CHECK_HANDLE((exh), refId); \
pThrd = (SCliThrd*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
@ -217,6 +207,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
tDebug("msg found, %" PRIu64 "", ahandle); \ tDebug("msg found, %" PRIu64 "", ahandle); \
} \ } \
} while (0) } while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \ #define CONN_GET_NEXT_SENDMSG(conn) \
do { \ do { \
int i = 0; \ int i = 0; \
@ -231,21 +222,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} \ } \
} while (0) } while (0)
#define CONN_HANDLE_THREAD_QUIT(thrd) \
do { \
if (thrd->quit) { \
return; \
} \
} while (0)
#define CONN_HANDLE_BROKEN(conn) \
do { \
if (conn->broken) { \
cliHandleExcept(conn); \
return; \
} \
} while (0)
#define CONN_SET_PERSIST_BY_APP(conn) \ #define CONN_SET_PERSIST_BY_APP(conn) \
do { \ do { \
if (conn->status == ConnNormal) { \ if (conn->status == ConnNormal) { \

View File

@ -57,7 +57,7 @@ static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet);
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove);
int32_t taosOpenRef(int32_t max, void (*fp)(void *)) { int32_t taosOpenRef(int32_t max, RefFp fp) {
SRefNode **nodeList; SRefNode **nodeList;
SRefSet *pSet; SRefSet *pSet;
int64_t *lockedBy; int64_t *lockedBy;