Merge pull request #24409 from taosdata/opt/optMsgOnMnode

opt msg on mnd
This commit is contained in:
Haojun Liao 2024-01-12 09:36:37 +08:00 committed by GitHub
commit ea0ea09b0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 239 additions and 136 deletions

View File

@ -53,9 +53,12 @@ typedef struct {
void* mgmt; void* mgmt;
void* clientRpc; void* clientRpc;
void* serverRpc; void* serverRpc;
void* statusRpc;
void* syncRpc;
PutToQueueFp putToQueueFp; PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendReqFp sendSyncReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
@ -67,6 +70,7 @@ void tmsgSetDefault(const SMsgCb* msgcb);
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg); int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg); int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(SRpcMsg* pMsg); void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);

View File

@ -109,12 +109,12 @@ extern const int32_t TYPE_BYTES[21];
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2 #define TSDB_INS_USER_STABLES_DBNAME_COLID 2
static const int64_t TICK_PER_SECOND[] = { static const int64_t TICK_PER_SECOND[] = {
1000LL, // MILLISECOND 1000LL, // MILLISECOND
1000000LL, // MICROSECOND 1000000LL, // MICROSECOND
1000000000LL, // NANOSECOND 1000000000LL, // NANOSECOND
0LL, // HOUR 0LL, // HOUR
0LL, // MINUTE 0LL, // MINUTE
1LL // SECOND 1LL // SECOND
}; };
#define TSDB_TICK_PER_SECOND(precision) \ #define TSDB_TICK_PER_SECOND(precision) \
@ -239,8 +239,8 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb #define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
#define TSDB_VIEW_NAME_LEN 193 #define TSDB_VIEW_NAME_LEN 193
#define TSDB_VIEW_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_VIEW_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_VIEW_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_VIEW_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025 #define TSDB_TB_COMMENT_LEN 1025
@ -260,7 +260,7 @@ typedef enum ELogicConditionType {
#define TSDB_PASSWORD_LEN 32 #define TSDB_PASSWORD_LEN 32
#define TSDB_USET_PASSWORD_LEN 129 #define TSDB_USET_PASSWORD_LEN 129
#define TSDB_VERSION_LEN 32 #define TSDB_VERSION_LEN 32
#define TSDB_LABEL_LEN 12 #define TSDB_LABEL_LEN 16
#define TSDB_JOB_STATUS_LEN 32 #define TSDB_JOB_STATUS_LEN 32
#define TSDB_CLUSTER_ID_LEN 40 #define TSDB_CLUSTER_ID_LEN 40
@ -288,7 +288,7 @@ typedef enum ELogicConditionType {
#define TSDB_ACTIVE_KEY_LEN 109 #define TSDB_ACTIVE_KEY_LEN 109
#define TSDB_CONN_ACTIVE_KEY_LEN 255 #define TSDB_CONN_ACTIVE_KEY_LEN 255
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE #define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
#define TSDB_SNAP_DATA_PAYLOAD_SIZE (1 * 1024 * 1024) #define TSDB_SNAP_DATA_PAYLOAD_SIZE (1 * 1024 * 1024)
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
@ -397,13 +397,13 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_STT_TRIGGER 1 #define TSDB_MAX_STT_TRIGGER 1
#define TSDB_DEFAULT_SST_TRIGGER 1 #define TSDB_DEFAULT_SST_TRIGGER 1
#endif #endif
#define TSDB_STT_TRIGGER_ARRAY_SIZE 16 // maximum of TSDB_MAX_STT_TRIGGER of TD_ENTERPRISE and TD_COMMUNITY #define TSDB_STT_TRIGGER_ARRAY_SIZE 16 // maximum of TSDB_MAX_STT_TRIGGER of TD_ENTERPRISE and TD_COMMUNITY
#define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN) #define TSDB_MIN_HASH_PREFIX (2 - TSDB_TABLE_NAME_LEN)
#define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2) #define TSDB_MAX_HASH_PREFIX (TSDB_TABLE_NAME_LEN - 2)
#define TSDB_DEFAULT_HASH_PREFIX 0 #define TSDB_DEFAULT_HASH_PREFIX 0
#define TSDB_MIN_HASH_SUFFIX (2 - TSDB_TABLE_NAME_LEN) #define TSDB_MIN_HASH_SUFFIX (2 - TSDB_TABLE_NAME_LEN)
#define TSDB_MAX_HASH_SUFFIX (TSDB_TABLE_NAME_LEN - 2) #define TSDB_MAX_HASH_SUFFIX (TSDB_TABLE_NAME_LEN - 2)
#define TSDB_DEFAULT_HASH_SUFFIX 0 #define TSDB_DEFAULT_HASH_SUFFIX 0
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1 #define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_REP_DEF_DB_WAL_RET_PERIOD 3600 #define TSDB_REP_DEF_DB_WAL_RET_PERIOD 3600

View File

@ -96,9 +96,9 @@ int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false; bool tsMonitorComp = false;
// audit // audit
bool tsEnableAudit = true; bool tsEnableAudit = true;
bool tsEnableAuditCreateTable = true; bool tsEnableAuditCreateTable = true;
int32_t tsAuditInterval = 5000; int32_t tsAuditInterval = 5000;
// telem // telem
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
@ -109,7 +109,7 @@ bool tsEnableTelem = true;
int32_t tsTelemInterval = 43200; int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
char * tsTelemUri = "/report"; char *tsTelemUri = "/report";
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
bool tsEnableCrashReport = false; bool tsEnableCrashReport = false;
@ -253,7 +253,7 @@ int32_t tsCompactPullupInterval = 10;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointInterval = 60; int32_t tsStreamCheckpointInterval = 60;
float tsSinkDataRate = 2.0; float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 15; int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10; int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
@ -283,7 +283,7 @@ int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsS3PageCacheSize = 4096; // number of pages int32_t tsS3PageCacheSize = 4096; // number of pages
int32_t tsS3UploadDelaySec = 60 * 60 * 24; int32_t tsS3UploadDelaySec = 60 * 60 * 24;
bool tsExperimental = true; bool tsExperimental = true;
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
@ -695,8 +695,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -715,8 +714,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0) 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER,
0) CFG_DYN_ENT_SERVER) != 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0) 0)
@ -1382,7 +1381,7 @@ void taosCleanupCfg() {
typedef struct { typedef struct {
const char *optionName; const char *optionName;
void * optionVar; void *optionVar;
} OptionNameAndVar; } OptionNameAndVar;
static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) { static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) {
@ -1395,7 +1394,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
switch (pItem->dtype) { switch (pItem->dtype) {
case CFG_DTYPE_BOOL: { case CFG_DTYPE_BOOL: {
int32_t flag = pItem->i32; int32_t flag = pItem->i32;
bool * pVar = pOptions[d].optionVar; bool *pVar = pOptions[d].optionVar;
uInfo("%s set from %d to %d", optName, *pVar, flag); uInfo("%s set from %d to %d", optName, *pVar, flag);
*pVar = flag; *pVar = flag;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
@ -1470,40 +1469,38 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
}; };
static OptionNameAndVar options[] = { static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},
{"audit", &tsEnableAudit}, {"asynclog", &tsAsyncLog},
{"asynclog", &tsAsyncLog}, {"disableStream", &tsDisableStream},
{"disableStream", &tsDisableStream}, {"enableWhiteList", &tsEnableWhiteList},
{"enableWhiteList", &tsEnableWhiteList}, {"telemetryReporting", &tsEnableTelem},
{"telemetryReporting", &tsEnableTelem}, {"monitor", &tsEnableMonitor},
{"monitor", &tsEnableMonitor},
{"mndSdbWriteDelta", &tsMndSdbWriteDelta}, {"mndSdbWriteDelta", &tsMndSdbWriteDelta},
{"minDiskFreeSize", &tsMinDiskFreeSize}, {"minDiskFreeSize", &tsMinDiskFreeSize},
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
{"checkpointInterval", &tsStreamCheckpointInterval}, {"checkpointInterval", &tsStreamCheckpointInterval},
{"keepAliveIdle", &tsKeepAliveIdle}, {"keepAliveIdle", &tsKeepAliveIdle},
{"logKeepDays", &tsLogKeepDays}, {"logKeepDays", &tsLogKeepDays},
{"maxStreamBackendCache", &tsMaxStreamBackendCache}, {"maxStreamBackendCache", &tsMaxStreamBackendCache},
{"mqRebalanceInterval", &tsMqRebalanceInterval}, {"mqRebalanceInterval", &tsMqRebalanceInterval},
{"numOfLogLines", &tsNumOfLogLines}, {"numOfLogLines", &tsNumOfLogLines},
{"queryRspPolicy", &tsQueryRspPolicy}, {"queryRspPolicy", &tsQueryRspPolicy},
{"timeseriesThreshold", &tsTimeSeriesThreshold}, {"timeseriesThreshold", &tsTimeSeriesThreshold},
{"tmqMaxTopicNum", &tmqMaxTopicNum}, {"tmqMaxTopicNum", &tmqMaxTopicNum},
{"transPullupInterval", &tsTransPullupInterval}, {"transPullupInterval", &tsTransPullupInterval},
{"compactPullupInterval", &tsCompactPullupInterval}, {"compactPullupInterval", &tsCompactPullupInterval},
{"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, {"trimVDbIntervalSec", &tsTrimVDbIntervalSec},
{"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlBatchDropNum", &tsTtlBatchDropNum},
{"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlFlushThreshold", &tsTtlFlushThreshold},
{"ttlPushInterval", &tsTtlPushIntervalSec}, {"ttlPushInterval", &tsTtlPushIntervalSec},
//{"s3BlockSize", &tsS3BlockSize}, //{"s3BlockSize", &tsS3BlockSize},
{"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize},
{"s3PageCacheSize", &tsS3PageCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize},
{"s3UploadDelaySec", &tsS3UploadDelaySec}, {"s3UploadDelaySec", &tsS3UploadDelaySec},
{"supportVnodes", &tsNumOfSupportVnodes}, {"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental} {"experimental", &tsExperimental}};
};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false); taosCfgSetOption(options, tListLen(options), pItem, false);
@ -1697,36 +1694,34 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
{"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, {"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag},
{"idxDebugFlag", &idxDebugFlag}, {"jniDebugFlag", &jniDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"jniDebugFlag", &jniDebugFlag}, {"qDebugFlag", &qDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag},
{"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"simDebugFlag", &simDebugFlag},
}; };
static OptionNameAndVar options[] = { static OptionNameAndVar options[] = {{"asyncLog", &tsAsyncLog},
{"asyncLog", &tsAsyncLog}, {"assert", &tsAssert},
{"assert", &tsAssert}, {"compressMsgSize", &tsCompressMsgSize},
{"compressMsgSize", &tsCompressMsgSize}, {"countAlwaysReturnValue", &tsCountAlwaysReturnValue},
{"countAlwaysReturnValue", &tsCountAlwaysReturnValue}, {"crashReporting", &tsEnableCrashReport},
{"crashReporting", &tsEnableCrashReport}, {"enableCoreFile", &tsAsyncLog},
{"enableCoreFile", &tsAsyncLog}, {"enableQueryHb", &tsEnableQueryHb},
{"enableQueryHb", &tsEnableQueryHb}, {"keepColumnName", &tsKeepColumnName},
{"keepColumnName", &tsKeepColumnName}, {"keepAliveIdle", &tsKeepAliveIdle},
{"keepAliveIdle", &tsKeepAliveIdle}, {"logKeepDays", &tsLogKeepDays},
{"logKeepDays", &tsLogKeepDays}, {"maxInsertBatchRows", &tsMaxInsertBatchRows},
{"maxInsertBatchRows", &tsMaxInsertBatchRows}, {"maxRetryWaitTime", &tsMaxRetryWaitTime},
{"maxRetryWaitTime", &tsMaxRetryWaitTime}, {"minSlidingTime", &tsMinSlidingTime},
{"minSlidingTime", &tsMinSlidingTime}, {"minIntervalTime", &tsMinIntervalTime},
{"minIntervalTime", &tsMinIntervalTime}, {"numOfLogLines", &tsNumOfLogLines},
{"numOfLogLines", &tsNumOfLogLines}, {"querySmaOptimize", &tsQuerySmaOptimize},
{"querySmaOptimize", &tsQuerySmaOptimize}, {"queryPolicy", &tsQueryPolicy},
{"queryPolicy", &tsQueryPolicy}, {"queryPlannerTrace", &tsQueryPlannerTrace},
{"queryPlannerTrace", &tsQueryPlannerTrace}, {"queryNodeChunkSize", &tsQueryNodeChunkSize},
{"queryNodeChunkSize", &tsQueryNodeChunkSize}, {"queryUseNodeAllocator", &tsQueryUseNodeAllocator},
{"queryUseNodeAllocator", &tsQueryUseNodeAllocator}, {"smlDot2Underline", &tsSmlDot2Underline},
{"smlDot2Underline", &tsSmlDot2Underline}, {"shellActivityTimer", &tsShellActivityTimer},
{"shellActivityTimer", &tsShellActivityTimer}, {"slowLogThreshold", &tsSlowLogThreshold},
{"slowLogThreshold", &tsSlowLogThreshold}, {"useAdapter", &tsUseAdapter},
{"useAdapter", &tsUseAdapter}, {"experimental", &tsExperimental}};
{"experimental", &tsExperimental}
};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false); taosCfgSetOption(options, tListLen(options), pItem, false);
@ -1803,7 +1798,7 @@ void taosSetAllDebugFlag(int32_t flag) {
taosArrayClear(noNeedToSetVars); // reset array taosArrayClear(noNeedToSetVars); // reset array
uInfo("all debug flag are set to %d", flag); uInfo("all debug flag are set to %d", flag);
if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist
} }
int8_t taosGranted() { return atomic_load_8(&tsGrant); } int8_t taosGranted() { return atomic_load_8(&tsGrant); }

View File

@ -45,7 +45,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer}; SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req); int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void * pHead = rpcMallocCont(contLen);
tSerializeRetrieveIpWhite(pHead, contLen, &req); tSerializeRetrieveIpWhite(pHead, contLen, &req);
SRpcMsg rpcMsg = {.pCont = pHead, SRpcMsg rpcMsg = {.pCont = pHead,
@ -144,7 +144,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.ipWhiteVer = pMgmt->pData->ipWhiteVer; req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void * pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req); tSerializeSStatusReq(pHead, contLen, &req);
tFreeSStatusReq(&req); tFreeSStatusReq(&req);
@ -161,7 +161,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SEpSet epSet = {0}; SEpSet epSet = {0};
int8_t epUpdated = 0; int8_t epUpdated = 0;
dmGetMnodeEpSet(pMgmt->pData, &epSet); dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, 5000); rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dmRotateMnodeEpSet(pMgmt->pData); dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[512]; char tbuf[512];
@ -189,7 +189,7 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
req.pVloads = vinfo.pVloads; req.pVloads = vinfo.pVloads;
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req); int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); void * pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, &req); tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req); tFreeSNotifyReq(&req);
@ -284,7 +284,7 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
SSDataBlock *dmBuildVariablesBlock(void) { SSDataBlock *dmBuildVariablesBlock(void) {
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock * pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
size_t size = 0; size_t size = 0;
const SSysTableMeta *pMeta = NULL; const SSysTableMeta *pMeta = NULL;
getInfosDbMeta(&pMeta, &size); getInfosDbMeta(&pMeta, &size);

View File

@ -47,28 +47,29 @@ static void *dmStatusThreadFp(void *param) {
} }
SDmNotifyHandle dmNotifyHdl = {.state = 0}; SDmNotifyHandle dmNotifyHdl = {.state = 0};
static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
setThreadName("dnode-notify");
if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { static void *dmNotifyThreadFp(void *param) {
return NULL; SDnodeMgmt *pMgmt = param;
setThreadName("dnode-notify");
if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
return NULL;
} }
bool wait = true; bool wait = true;
while (1) { while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (wait) tsem_wait(&dmNotifyHdl.sem); if (wait) tsem_wait(&dmNotifyHdl.sem);
atomic_store_8(&dmNotifyHdl.state, 1); atomic_store_8(&dmNotifyHdl.state, 1);
dmSendNotifyReq(pMgmt); dmSendNotifyReq(pMgmt);
if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
wait = true; wait = true;
continue; continue;
} }
wait = false; wait = false;
} }
return NULL; return NULL;
} }
static void *dmMonitorThreadFp(void *param) { static void *dmMonitorThreadFp(void *param) {

View File

@ -48,6 +48,8 @@ typedef struct {
typedef struct { typedef struct {
void *serverRpc; void *serverRpc;
void *clientRpc; void *clientRpc;
void *statusRpc;
void *syncRpc;
SDnodeHandle msgHandles[TDMT_MAX]; SDnodeHandle msgHandles[TDMT_MAX];
} SDnodeTrans; } SDnodeTrans;
@ -136,8 +138,10 @@ int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode);
int32_t dmInitClient(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode);
int32_t dmInitStatusClient(SDnode *pDnode); int32_t dmInitStatusClient(SDnode *pDnode);
int32_t dmInitSyncClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode);
void dmCleanupStatusClient(SDnode *pDnode); void dmCleanupStatusClient(SDnode *pDnode);
void dmCleanupSyncClient(SDnode *pDnode);
SMsgCb dmGetMsgcb(SDnode *pDnode); SMsgCb dmGetMsgcb(SDnode *pDnode);
#ifdef TD_MODULE_OPTIMIZE #ifdef TD_MODULE_OPTIMIZE
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers); int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);

View File

@ -94,6 +94,9 @@ int32_t dmInitDnode(SDnode *pDnode) {
indexInit(tsNumOfCommitThreads); indexInit(tsNumOfCommitThreads);
streamMetaInit(); streamMetaInit();
dmInitStatusClient(pDnode);
dmInitSyncClient(pDnode);
dmReportStartup("dnode-transport", "initialized"); dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode); dDebug("dnode is created, ptr:%p", pDnode);
code = 0; code = 0;
@ -115,7 +118,9 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupClient(pDnode); dmCleanupClient(pDnode);
dmCleanupStatusClient(pDnode); dmCleanupStatusClient(pDnode);
dmCleanupSyncClient(pDnode);
dmCleanupServer(pDnode); dmCleanupServer(pDnode);
dmClearVars(pDnode); dmClearVars(pDnode);
rpcCleanup(); rpcCleanup();
streamMetaCleanup(); streamMetaCleanup();

View File

@ -322,6 +322,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return 0; return 0;
} }
} }
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
if (pDnode->status == DND_STAT_INIT) {
terrno = TSDB_CODE_APP_IS_STARTING;
} else {
terrno = TSDB_CODE_APP_IS_STOPPING;
}
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
return -1;
} else {
rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
return 0;
}
}
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); } static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); }
@ -346,8 +363,8 @@ int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = "DND-C"; rpcInit.label = "DNODE-CLI";
rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
@ -366,7 +383,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp; rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
@ -390,7 +407,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = "DND-STATUS"; rpcInit.label = "DNODE-STA-CLI";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
@ -421,16 +438,61 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
// pTrans->statusClientRpc = rpcOpen(&rpcInit); pTrans->statusRpc = rpcOpen(&rpcInit);
// if (pTrans->statusClientRpc == NULL) { if (pTrans->statusRpc == NULL) {
// dError("failed to init dnode rpc status client"); dError("failed to init dnode rpc status client");
// return -1; return -1;
// } }
dDebug("dnode rpc status client is initialized"); dDebug("dnode rpc status client is initialized");
return 0; return 0;
} }
int32_t dmInitSyncClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.label = "DNODE-SYNC-CLI";
rpcInit.numOfThreads = tsNumOfRpcThreads / 2;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2;
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->syncRpc = rpcOpen(&rpcInit);
if (pTrans->syncRpc == NULL) {
dError("failed to init dnode rpc sync client");
return -1;
}
dDebug("dnode rpc sync client is initialized");
return 0;
}
void dmCleanupClient(SDnode *pDnode) { void dmCleanupClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->clientRpc) { if (pTrans->clientRpc) {
@ -441,11 +503,19 @@ void dmCleanupClient(SDnode *pDnode) {
} }
void dmCleanupStatusClient(SDnode *pDnode) { void dmCleanupStatusClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
// if (pTrans->statusClientRpc) { if (pTrans->statusRpc) {
// rpcClose(pTrans->statusClientRpc); rpcClose(pTrans->statusRpc);
// pTrans->statusClientRpc = NULL; pTrans->statusRpc = NULL;
// dDebug("dnode rpc status client is closed"); dDebug("dnode rpc status client is closed");
// } }
}
void dmCleanupSyncClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->syncRpc) {
rpcClose(pTrans->syncRpc);
pTrans->syncRpc = NULL;
dDebug("dnode rpc sync client is closed");
}
} }
int32_t dmInitServer(SDnode *pDnode) { int32_t dmInitServer(SDnode *pDnode) {
@ -486,7 +556,10 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
SMsgCb msgCb = { SMsgCb msgCb = {
.clientRpc = pDnode->trans.clientRpc, .clientRpc = pDnode->trans.clientRpc,
.serverRpc = pDnode->trans.serverRpc, .serverRpc = pDnode->trans.serverRpc,
.statusRpc = pDnode->trans.statusRpc,
.syncRpc = pDnode->trans.syncRpc,
.sendReqFp = dmSendReq, .sendReqFp = dmSendReq,
.sendSyncReqFp = dmSendSyncReq,
.sendRspFp = dmSendRsp, .sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle, .releaseHandleFp = dmReleaseHandle,

View File

@ -32,6 +32,10 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
return -1; return -1;
} }
int32_t sendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
char *i642str(int64_t val) { char *i642str(int64_t val) {
static char str[24] = {0}; static char str[24] = {0};
@ -568,6 +572,7 @@ void mndDumpSdb() {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup; msgCb.reportStartupFp = reportStartup;
msgCb.sendReqFp = sendReq; msgCb.sendReqFp = sendReq;
msgCb.sendSyncReqFp = sendSyncReq;
msgCb.sendRspFp = sendRsp; msgCb.sendRspFp = sendRsp;
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb); tmsgSetDefault(&msgCb);
@ -590,7 +595,7 @@ void mndDumpSdb() {
dumpTopic(pSdb, json); dumpTopic(pSdb, json);
dumpConsumer(pSdb, json); dumpConsumer(pSdb, json);
dumpSubscribe(pSdb, json); dumpSubscribe(pSdb, json);
// dumpOffset(pSdb, json); // dumpOffset(pSdb, json);
dumpStream(pSdb, json); dumpStream(pSdb, json);
dumpAcct(pSdb, json); dumpAcct(pSdb, json);
dumpAuth(pSdb, json); dumpAuth(pSdb, json);
@ -605,7 +610,7 @@ void mndDumpSdb() {
char *pCont = tjsonToString(json); char *pCont = tjsonToString(json);
int32_t contLen = strlen(pCont); int32_t contLen = strlen(pCont);
char file[] = "sdb.json"; char file[] = "sdb.json";
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC| TD_FILE_WRITE_THROUGH); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to write %s since %s", file, terrstr()); mError("failed to write %s since %s", file, terrstr());

View File

@ -299,14 +299,14 @@ static bool mnodeIsNotLeader(SMnode *pMnode) {
} }
static int32_t minCronTime() { static int32_t minCronTime() {
int64_t min = INT64_MAX; int32_t min = INT32_MAX;
min = TMIN(min, tsTtlPushIntervalSec); min = TMIN(min, tsTtlPushIntervalSec);
min = TMIN(min, tsTrimVDbIntervalSec); min = TMIN(min, tsTrimVDbIntervalSec);
min = TMIN(min, tsTransPullupInterval); min = TMIN(min, tsTransPullupInterval);
min = TMIN(min, tsCompactPullupInterval); min = TMIN(min, tsCompactPullupInterval);
min = TMIN(min, tsMqRebalanceInterval); min = TMIN(min, tsMqRebalanceInterval);
min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 5); // checkpointRemain min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval); min = TMIN(min, tsStreamNodeCheckInterval);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
@ -386,7 +386,8 @@ static void *mndThreadFp(void *param) {
int64_t minCron = minCronTime(); int64_t minCron = minCronTime();
if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) { if (sec % minCron == 0 && mnodeIsNotLeader(pMnode)) {
// not leader, do nothing // not leader, do nothing
mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno)) terrno = 0; mTrace("timer not process since mnode is not leader, reason: %s", tstrerror(terrno));
terrno = 0;
continue; continue;
} }
mndDoTimerPullupTask(pMnode, sec); mndDoTimerPullupTask(pMnode, sec);

View File

@ -61,6 +61,7 @@ class MndTestTrans2 : public ::testing::Test {
static SMsgCb msgCb = {0}; static SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup; msgCb.reportStartupFp = reportStartup;
msgCb.sendReqFp = sendReq; msgCb.sendReqFp = sendReq;
msgCb.sendSyncReqFp = sendSyncReq;
msgCb.sendRspFp = sendRsp; msgCb.sendRspFp = sendRsp;
msgCb.queueFps[SYNC_QUEUE] = putToQueue; msgCb.queueFps[SYNC_QUEUE] = putToQueue;
msgCb.queueFps[WRITE_QUEUE] = putToQueue; msgCb.queueFps[WRITE_QUEUE] = putToQueue;

View File

@ -14,11 +14,11 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tq.h"
#include "sync.h" #include "sync.h"
#include "tq.h"
#include "tqCommon.h"
#include "tsdb.h" #include "tsdb.h"
#include "vnd.h" #include "vnd.h"
#include "tqCommon.h"
#define BATCH_ENABLE 0 #define BATCH_ENABLE 0
@ -411,7 +411,7 @@ static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
} }
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = tmsgSendReq(pEpSet, pMsg); int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
if (code != 0) { if (code != 0) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
@ -477,8 +477,8 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta
} }
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader); int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
return code; return code;
} }
@ -555,7 +555,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
walApplyVer(pVnode->pWal, commitIdx); walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true; pVnode->restored = true;
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pMeta->startInfo.tasksWillRestart) { if (pMeta->startInfo.tasksWillRestart) {

View File

@ -48,6 +48,14 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
} }
return code; return code;
} }
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg) {
int32_t code = (*defaultMsgCb.sendSyncReqFp)(epSet, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
void tmsgSendRsp(SRpcMsg* pMsg) { void tmsgSendRsp(SRpcMsg* pMsg) {
#if 1 #if 1

View File

@ -1907,7 +1907,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
static void* cliWorkThread(void* arg) { static void* cliWorkThread(void* arg) {
SCliThrd* pThrd = (SCliThrd*)arg; SCliThrd* pThrd = (SCliThrd*)arg;
pThrd->pid = taosGetSelfPthreadId(); pThrd->pid = taosGetSelfPthreadId();
setThreadName("trans-cli-work");
char threadName[TSDB_LABEL_LEN] = {0};
STrans* pInst = pThrd->pTransInst;
strtolower(threadName, pInst->label);
setThreadName(threadName);
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
tDebug("thread quit-thread:%08" PRId64, pThrd->pid); tDebug("thread quit-thread:%08" PRId64, pThrd->pid);
@ -2701,6 +2706,7 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr
ret = TSDB_CODE_TIMEOUT_ERROR; ret = TSDB_CODE_TIMEOUT_ERROR;
} else { } else {
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
pSyncMsg->pRsp->pCont = NULL;
if (pSyncMsg->hasEpSet == 1) { if (pSyncMsg->hasEpSet == 1) {
epsetAssign(pEpSet, &pSyncMsg->epSet); epsetAssign(pEpSet, &pSyncMsg->epSet);
*epUpdated = 1; *epUpdated = 1;

View File

@ -667,7 +667,7 @@ void transDestroySyncMsg(void* msg) {
STransSyncMsg* pSyncMsg = msg; STransSyncMsg* pSyncMsg = msg;
tsem_destroy(pSyncMsg->pSem); tsem_destroy(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pSem); taosMemoryFree(pSyncMsg->pSem);
transFreeMsg(pSyncMsg->pRsp->pCont);
taosMemoryFree(pSyncMsg->pRsp); taosMemoryFree(pSyncMsg->pRsp);
taosMemoryFree(pSyncMsg); taosMemoryFree(pSyncMsg);
} }