Merge pull request #27907 from taosdata/enh/opt-transport

Enh: RPC transport optimization
This commit is contained in:
Shengliang Guan 2024-10-22 09:56:48 +08:00 committed by GitHub
commit 4527e96d2b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 3661 additions and 2672 deletions

View File

@ -55,7 +55,7 @@ extern char tsEncryptAlgorithm[];
extern char tsEncryptScope[];
extern EEncryptAlgor tsiEncryptAlgorithm;
extern EEncryptScope tsiEncryptScope;
//extern char tsAuthCode[];
// extern char tsAuthCode[];
extern char tsEncryptKey[];
// common
@ -71,6 +71,8 @@ extern int32_t tsTagFilterResCacheSize;
// queue & threads
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsShareConnLimit;
extern int32_t tsReadTimeout;
extern int32_t tsTimeToGetAvailableConn;
extern int32_t tsKeepAliveIdle;
extern int32_t tsNumOfCommitThreads;
@ -264,8 +266,8 @@ extern bool tsExperimental;
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs);
int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl,
SArray *pArgs);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc);
void taosCleanupCfg();

View File

@ -983,6 +983,7 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);

View File

@ -329,6 +329,7 @@
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_RELEASE, "task-release", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_SCH_MSG)

View File

@ -63,6 +63,10 @@ typedef struct SRpcHandleInfo {
int8_t forbiddenIp;
int8_t notFreeAhandle;
int8_t compressed;
int64_t seqNum; // msg seq
int64_t qId; // queryId Get from client, other req's qId = -1;
int32_t refIdMgt;
int32_t msgType;
} SRpcHandleInfo;
typedef struct SRpcMsg {
@ -124,8 +128,12 @@ typedef struct SRpcInit {
int32_t connLimitLock;
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
int32_t shareConnLimit;
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t startReadTimer;
int64_t readTimeout; // s
void *parent;
} SRpcInit;
@ -144,6 +152,7 @@ typedef struct {
SHashObj *args;
SRpcBrokenlinkVal brokenVal;
void (*freeFunc)(const void *arg);
int64_t st;
} SRpcCtx;
int32_t rpcInit();

View File

@ -130,12 +130,13 @@ int taosSetAutoDelFile(char *path);
bool lastErrorIsFileNotExist();
#ifdef BUILD_WITH_RAND_ERR
#define STUB_RAND_NETWORK_ERR(status) \
#define STUB_RAND_NETWORK_ERR(ret) \
do { \
if (tsEnableRandErr && (tsRandErrScope & RAND_ERR_NETWORK)) { \
uint32_t r = taosRand() % tsRandErrDivisor; \
if ((r + 1) <= tsRandErrChance) { \
status = TSDB_CODE_RPC_NETWORK_UNAVAIL; \
ret = TSDB_CODE_RPC_NETWORK_UNAVAIL; \
uError("random network error: %s, %s", tstrerror(ret), __func__); \
} \
} \
while (0)

View File

@ -137,6 +137,7 @@ int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket);
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer);
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on);
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosSetSockOpt2(int32_t fd);
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen);
int32_t taosWriteMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes);
int32_t taosReadMsg(TdSocketPtr pSocket, void *ptr, int32_t nbytes);
@ -159,7 +160,7 @@ TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, st
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen);
int32_t taosBlockSIGPIPE();
int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t* ip);
int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip);
int32_t taosGetFqdn(char *);
void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);

View File

@ -94,9 +94,9 @@ int32_t taosGetErrSize();
#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025)
#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026)
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
#define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A)
//common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //

View File

@ -506,7 +506,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else
#define TSDB_MAX_RPC_THREADS 50
#define TSDB_MAX_RPC_THREADS 20
#endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type

View File

@ -370,7 +370,10 @@ int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread,
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 1000);
rpcInit.connLimitNum = connLimitNum;
rpcInit.shareConnLimit = tsShareConnLimit;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = tsReadTimeout;
int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
if (TSDB_CODE_SUCCESS != code) {

View File

@ -410,7 +410,6 @@ int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
// int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
if (code) {
doRequestCallback(pRequest, code);
@ -1921,7 +1920,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
return NULL;
}
//TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
// const char* db, int dbLen, uint16_t port) {
// char ipStr[TSDB_EP_LEN] = {0};
// char dbStr[TSDB_DB_NAME_LEN] = {0};
@ -1933,7 +1932,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
// tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
// tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
// return taos_connect(ipStr, userStr, passStr, dbStr, port);
//}
// }
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
@ -2301,7 +2300,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
(void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
(void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
(void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
(*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
} else {
tscError("doConvertJson error: invalid type:%d", jsonInnerType);
@ -2570,6 +2570,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.readTimeout = tsReadTimeout;
if (TSDB_CODE_SUCCESS != taosVersionStrToInt(version, &(rpcInit.compatibilityVer))) {
tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
goto _OVER;

View File

@ -56,6 +56,8 @@ int32_t tsShellActivityTimer = 3; // second
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000;
int32_t tsShareConnLimit = 8;
int32_t tsReadTimeout = 900;
int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsKeepAliveIdle = 60;
@ -184,7 +186,7 @@ int32_t tsSlowLogThreshold = 10; // seconds
int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
char* tsSlowLogScopeString = "query";
char *tsSlowLogScopeString = "query";
int32_t tsSlowLogMaxLen = 4096;
int32_t tsTimeSeriesThreshold = 50;
bool tsMultiResultFunctionStarReturnTags = false;
@ -322,7 +324,6 @@ int32_t tsMaxTsmaNum = 3;
int32_t tsMaxTsmaCalcDelay = 600;
int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d
#define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \
if ((pItem = cfgGetItem(pCfg, pName)) == NULL) { \
TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND); \
@ -361,7 +362,7 @@ static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSD
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, name);
char *strDup = NULL;
if ((strDup = taosStrdup(pItem->str))== NULL){
if ((strDup = taosStrdup(pItem->str)) == NULL) {
code = terrno;
goto _exit;
}
@ -450,7 +451,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
struct SConfig *taosGetCfg() { return tsCfg; }
struct SConfig *taosGetCfg() {
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {
@ -572,7 +575,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, CFG_SCOPE_CLIENT, CFG_DYN_ENT_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryTableNotExistAsEmpty", tsQueryTbNotExistAsEmpty, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "queryTableNotExistAsEmpty", tsQueryTbNotExistAsEmpty, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
@ -600,16 +604,23 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(
cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH));
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH, CFG_DYN_NONE));
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
tsShareConnLimit = TRANGE(tsShareConnLimit, 1, 512);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "shareConnLimit", tsShareConnLimit, 1, 512, CFG_SCOPE_BOTH, CFG_DYN_NONE));
tsReadTimeout = TRANGE(tsReadTimeout, 64, 24 * 3600 * 7);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "readTimeout", tsReadTimeout, 64, 24 * 3600 * 7, CFG_SCOPE_BOTH, CFG_DYN_NONE));
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
TAOS_CHECK_RETURN(
cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
@ -865,8 +876,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(pCfg, "numOfRpcThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfRpcThreads = numOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, TSDB_MAX_RPC_THREADS);
pItem->i32 = tsNumOfRpcThreads;
pItem->stype = stype;
}
@ -878,6 +888,20 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "shareConnLimit");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsShareConnLimit = TRANGE(tsShareConnLimit, 1, 512);
pItem->i32 = tsShareConnLimit;
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "readTimeout");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsReadTimeout = TRANGE(tsReadTimeout, 64, 24 * 3600 * 7);
pItem->i32 = tsReadTimeout;
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "timeToGetAvailableConn");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
@ -1083,9 +1107,9 @@ int32_t taosSetSlowLogScope(char *pScopeStr, int32_t *pScope) {
int32_t slowScope = 0;
char* scope = NULL;
char *scope = NULL;
char *tmp = NULL;
while((scope = strsep(&pScopeStr, "|")) != NULL){
while ((scope = strsep(&pScopeStr, "|")) != NULL) {
taosMemoryFreeClear(tmp);
tmp = taosStrdup(scope);
if (tmp == NULL) {
@ -1252,6 +1276,12 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfRpcSessions");
tsNumOfRpcSessions = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "shareConnLimit");
tsShareConnLimit = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "readTimeout");
tsReadTimeout = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "timeToGetAvailableConn");
tsTimeToGetAvailableConn = pItem->i32;
@ -1353,6 +1383,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfRpcSessions");
tsNumOfRpcSessions = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "shareConnLimit");
tsShareConnLimit = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "readTimeout");
tsReadTimeout = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "timeToGetAvailableConn");
tsTimeToGetAvailableConn = pItem->i32;
@ -1724,7 +1760,7 @@ int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd, const char *
TAOS_CHECK_GOTO(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE), NULL, _exit);
TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "debugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER), NULL, _exit);
TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) ,NULL, _exit);
TAOS_CHECK_GOTO(cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER), NULL, _exit);
if ((code = taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl)) != 0) {
(void)printf("failed to load cfg since %s\n", tstrerror(code));
@ -2053,8 +2089,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
}
case 'f': {
if (strcasecmp("fqdn", name) == 0) {
SConfigItem* pFqdnItem = cfgGetItem(pCfg, "fqdn");
SConfigItem* pServerPortItem = cfgGetItem(pCfg, "serverPort");
SConfigItem *pFqdnItem = cfgGetItem(pCfg, "fqdn");
SConfigItem *pServerPortItem = cfgGetItem(pCfg, "serverPort");
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
if (pFqdnItem == NULL || pServerPortItem == NULL || pFirstEpItem == NULL) {
uError("failed to get fqdn or serverPort or firstEp from cfg");
@ -2109,8 +2145,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
}
case 'l': {
if (strcasecmp("locale", name) == 0) {
SConfigItem* pLocaleItem = cfgGetItem(pCfg, "locale");
SConfigItem* pCharsetItem = cfgGetItem(pCfg, "charset");
SConfigItem *pLocaleItem = cfgGetItem(pCfg, "locale");
SConfigItem *pCharsetItem = cfgGetItem(pCfg, "charset");
if (pLocaleItem == NULL || pCharsetItem == NULL) {
uError("failed to get locale or charset from cfg");
code = TSDB_CODE_CFG_NOT_FOUND;

View File

@ -74,7 +74,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_RETRIEVE_IP_WHITE,
.info.ahandle = (void *)0x9527,
.info.ahandle = 0,
.info.notFreeAhandle = 1,
.info.refId = 0,
.info.noResp = 0,
@ -249,7 +249,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_STATUS,
.info.ahandle = (void *)0x9527,
.info.ahandle = 0,
.info.notFreeAhandle = 1,
.info.refId = 0,
.info.noResp = 0,
@ -322,7 +322,7 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_NOTIFY,
.info.ahandle = (void *)0x9527,
.info.ahandle = 0,
.info.notFreeAhandle = 1,
.info.refId = 0,
.info.noResp = 1,

View File

@ -267,7 +267,7 @@ _OVER:
}
if (IsReq(pRpc)) {
SRpcMsg rsp = {.code = code, .info = pRpc->info};
SRpcMsg rsp = {.code = code, .info = pRpc->info, .msgType = pRpc->msgType + 1};
if (code == TSDB_CODE_MNODE_NOT_FOUND) {
dmBuildMnodeRedirectRsp(pDnode, &rsp);
}
@ -418,9 +418,12 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConnLimit = tsShareConnLimit * 2;
rpcInit.shareConn = 1;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.notWaitAvaliableConn = 0;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = tsReadTimeout;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
@ -466,8 +469,10 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConnLimit = tsShareConnLimit * 2;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 0;
rpcInit.readTimeout = 0;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
@ -514,8 +519,11 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.shareConnLimit = tsShareConnLimit * 8;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
rpcInit.startReadTimer = 1;
rpcInit.readTimeout = tsReadTimeout;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);
}
@ -569,6 +577,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.shareConnLimit = tsShareConnLimit * 16;
if (taosVersionStrToInt(version, &(rpcInit.compatibilityVer)) != 0) {
dError("failed to convert version string:%s to int", version);

View File

@ -237,7 +237,7 @@ static void mndPullupGrant(SMnode *pMnode) {
.pCont = pReq,
.contLen = contLen,
.info.notFreeAhandle = 1,
.info.ahandle = (void *)0x9527};
.info.ahandle = 0};
// TODO check return value
if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);

View File

@ -30,7 +30,7 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
(void)qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) {
int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t code = -1;
SMnode *pMnode = pMsg->info.node;
@ -67,13 +67,12 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) {
return code;
}
static FORCE_INLINE void mnodeFreeSBatchRspMsg(void* p) {
static FORCE_INLINE void mnodeFreeSBatchRspMsg(void *p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
SBatchRspMsg *pRsp = (SBatchRspMsg *)p;
rpcFreeCont(pRsp->msg);
}
@ -108,7 +107,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
SBatchMsg* req = taosArrayGet(batchReq.pMsgs, i);
SBatchMsg *req = taosArrayGet(batchReq.pMsgs, i);
reqMsg.msgType = req->msgType;
reqMsg.pCont = req->msg;

View File

@ -226,7 +226,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
}
if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
@ -530,6 +530,16 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
if (pRpcHandle != NULL) {
int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
if (ret != 0) {
qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
}
*pRpcHandle = -1;
}
if (!pSourceDataInfo) {
return terrno;
}

View File

@ -504,6 +504,10 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
code);
// called if drop task rsp received code
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
if (pMsg->handle == NULL) {
qError("sch handle is NULL, may be already released and mem lea");
}
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);

View File

@ -5079,8 +5079,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
}
char content[256] = {0};
nBytes = tsnprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId,
"processVer", processId);
nBytes = tsnprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest,
p->curChkpId, "processVer", processId);
if (nBytes <= 0 || nBytes >= sizeof(content)) {
code = TSDB_CODE_OUT_OF_RANGE;
stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);

View File

@ -114,6 +114,7 @@ typedef struct SExHandle {
void* handle;
int64_t refId;
void* pThrd;
int8_t pThrdIdx;
queue q;
int8_t inited;
SRWLatch latch;
@ -135,57 +136,33 @@ typedef struct SCvtAddr {
bool cvt;
} SCvtAddr;
typedef struct {
SEpSet epSet; // ip list provided by app
SEpSet origEpSet;
void* ahandle; // handle provided by app
tmsg_t msgType; // message type
int8_t connType; // connection type cli/srv
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
int64_t syncMsgRef;
SCvtAddr cvtAddr;
int32_t retryMinInterval;
int32_t retryMaxInterval;
int32_t retryStepFactor;
int64_t retryMaxTimeout;
int64_t retryInitTimestamp;
int64_t retryNextInterval;
bool retryInit;
int32_t retryStep;
int8_t epsetRetryCnt;
int32_t retryCode;
void* task;
int hThrdIdx;
} STransConnCtx;
#pragma pack(push, 1)
typedef struct {
int8_t inUse;
int8_t numOfEps;
SEp eps[];
} SReqEpSet;
#define TRANS_VER 2
typedef struct {
char version : 4; // RPC version
char comp : 2; // compression algorithm, 0:no compression 1:lz4
char noResp : 2; // noResp bits, 0: resp, 1: resp
char persist : 2; // persist handle,0: no persit, 1: persist handle
char release : 2;
char withUserInfo : 2; // 0: sent user info or not
char secured : 2;
char spi : 2;
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
uint64_t timestamp;
char user[TSDB_UNI_LEN];
int32_t compatibilityVer;
uint32_t magicNum;
STraceId traceId;
uint64_t ahandle; // ahandle assigned by client
int64_t qid;
uint32_t code; // del later
uint32_t msgType;
int32_t msgLen;
int64_t seqNum;
uint8_t content[0]; // message body starts from here
} STransMsgHead;
@ -206,6 +183,35 @@ typedef struct {
#pragma pack(pop)
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet);
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet);
int32_t transValidReqEpset(SReqEpSet* pReqEpSet);
typedef struct {
SReqEpSet* epSet; // ip list provided by app
SReqEpSet* origEpSet;
void* ahandle; // handle provided by app
tmsg_t msgType; // message type
STransCtx userCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
int64_t syncMsgRef;
SCvtAddr* pCvtAddr;
int64_t retryInitTimestamp;
int64_t retryNextInterval;
int64_t retryMaxTimeout;
int32_t retryMinInterval;
int32_t retryMaxInterval;
int32_t retryStepFactor;
int32_t retryStep;
int32_t retryCode;
int8_t retryInit;
int8_t epsetRetryCnt;
} SReqCtx;
typedef enum { Normal, Quit, Release, Register, Update, FreeById } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
@ -272,24 +278,24 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
} \
} while (0)
#define ASYNC_CHECK_HANDLE(exh1, id) \
#define ASYNC_CHECK_HANDLE(idMgt, id, exh1) \
do { \
if (id > 0) { \
SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \
tDebug("ref:%" PRId64 " already released", id); \
code = terrno; \
SExHandle* exh2 = transAcquireExHandle(idMgt, id); \
if (exh2 == NULL || exh1 != exh2 || (exh2 != NULL && exh2->refId != id)) { \
tError("handle not match, exh1:%p, exh2:%p, refId:%"PRId64"", exh1, exh2, id); \
code = TSDB_CODE_INVALID_MSG; \
goto _return1; \
} \
} else { \
tDebug("invalid handle to release"); \
tError("invalid handle to release"); \
goto _return2; \
} \
} while (0)
int32_t transInitBuffer(SConnBuffer* buf);
int32_t transClearBuffer(SConnBuffer* buf);
int32_t transDestroyBuffer(SConnBuffer* buf);
void transDestroyBuffer(SConnBuffer* buf);
int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
@ -301,29 +307,30 @@ void transRefSrvHandle(void* handle);
void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
int32_t transUnrefCliHandle(void* handle);
int32_t transGetRefCount(void* handle);
int32_t transReleaseCliHandle(void* handle);
int32_t transReleaseSrvHandle(void* handle);
int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int32_t transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int32_t transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
int32_t transSendRequest(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* pCtx);
int32_t transSendRecv(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp);
int32_t transSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs);
int32_t transSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
int32_t transFreeConnById(void* shandle, int64_t transpointId);
int32_t transSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId);
int32_t transFreeConnById(void* pInit, int64_t transpointId);
int32_t transSendResponse(const STransMsg* msg);
int32_t transRegisterMsg(const STransMsg* msg);
int32_t transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
int32_t transSetDefaultAddr(void* pInit, const char* ip, const char* fqdn);
int32_t transSetIpWhiteList(void* pInit, void* arg, FilteFunc* func);
int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst);
void transSockInfo2Str(struct sockaddr* sockname, char* dst);
int32_t transAllocHandle(int64_t* refId);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void transCloseClient(void* arg);
void transCloseServer(void* arg);
@ -336,33 +343,29 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key);
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list
typedef struct STransReq {
queue q;
uv_write_t wreq;
} STransReq;
void transReqQueueInit(queue* q);
void* transReqQueuePush(queue* q);
void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q);
typedef struct SWriteReq {
queue node; // req queue node
void* conn;
} SWriteReq;
// queue sending msgs
typedef struct {
SArray* q;
void (*freeFunc)(const void* arg);
queue node;
void (*freeFunc)(void* arg);
int32_t size;
} STransQueue;
/*
* init queue
* note: queue'size is small, default 1
*/
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg));
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(void* arg));
/*
* put arg into queue
* if queue'size > 1, return false; else return true
*/
bool transQueuePush(STransQueue* queue, void* arg);
void transQueuePush(STransQueue* queue, void* arg);
/*
* the size of queue
*/
@ -375,10 +378,25 @@ void* transQueuePop(STransQueue* queue);
* get ith from queue
*/
void* transQueueGet(STransQueue* queue, int i);
/*
* head elm from queue
*/
void* tranQueueHead(STransQueue* q);
/*
* remove all match elm from queue
*/
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size);
/*
* rm ith from queue
*/
void* transQueueRm(STransQueue* queue, int i);
/*
* remove el from queue
*/
void transQueueRemove(STransQueue* q, void* e);
/*
* queue empty or not
*/
@ -418,9 +436,9 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
bool transReqEpsetIsEqual(SReqEpSet* a, SReqEpSet* b);
bool transEpSetIsEqual2(SEpSet* a, SEpSet* b);
bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b);
/*
* init global func
*/
@ -432,14 +450,14 @@ void transPrintEpSet(SEpSet* pEpSet);
void transFreeMsg(void* msg);
int32_t transCompressMsg(char* msg, int32_t len);
int32_t transDecompressMsg(char** msg, int32_t len);
int32_t transDecompressMsg(char** msg, int32_t* len);
int32_t transOpenRefMgt(int size, void (*func)(void*));
void transCloseRefMgt(int32_t refMgt);
int64_t transAddExHandle(int32_t refMgt, void* p);
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
void transRemoveExHandle(int32_t refMgt, int64_t refId);
void* transAcquireExHandle(int32_t refMgt, int64_t refId);
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestroyExHandle(void* handle);
int32_t transGetRefMgt();
@ -465,6 +483,33 @@ int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf);
int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf);
int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING };
#if defined(WINDOWS) || defined(DARWIN)
#define BUFFER_LIMIT 1
#define STATE_BUFFER_LIMIT 1
#else
#define BUFFER_LIMIT 4
#define STATE_BUFFER_LIMIT 8
#endif
#define HEAP_MISS_HIT_LIMIT 100000
#define READ_TIMEOUT 100000
typedef struct {
queue node; // queue for write
queue q; // queue for reqs
uv_write_t wreq;
void* arg;
} SWReqsWrapper;
int32_t initWQ(queue* wq);
void destroyWQ(queue* wq);
uv_write_t* allocWReqFromWQ(queue* wq, void* arg);
void freeWReqToWQ(queue* wq, SWReqsWrapper* w);
int32_t transSetReadOption(uv_handle_t* handle);
#ifdef __cplusplus
}
#endif

View File

@ -32,8 +32,8 @@
extern "C" {
#endif
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* pInit);
void taosCloseServer(void* arg);
void taosCloseClient(void* arg);
@ -70,12 +70,16 @@ typedef struct {
int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize;
int32_t shareConnLimit;
int8_t optBatchFetch;
int32_t timeToGetConn;
int index;
void* parent;
void* tcphandle; // returned handle from TCP initialization
int64_t refId;
int8_t shareConn;
int8_t startReadTimer;
int64_t readTimeout;
TdThreadMutex mutex;
} SRpcInfo;

View File

@ -15,13 +15,13 @@
#include "transComm.h"
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* pInit) = {
transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, NULL};
int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
@ -42,6 +42,8 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pRpc == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pRpc->startReadTimer = pInit->startReadTimer;
if (pInit->label) {
int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
memcpy(pRpc->label, pInit->label, len);
@ -77,7 +79,15 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connLimitLock = pInit->connLimitLock;
pRpc->supportBatch = pInit->supportBatch;
pRpc->batchSize = pInit->batchSize;
pRpc->shareConnLimit = pInit->shareConnLimit;
if (pRpc->shareConnLimit <= 0) {
pRpc->shareConnLimit = BUFFER_LIMIT;
}
pRpc->readTimeout = pInit->readTimeout;
if (pRpc->readTimeout < 0) {
pRpc->readTimeout = INT64_MAX;
}
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) {
@ -115,6 +125,8 @@ void* rpcOpen(const SRpcInit* pInit) {
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
void* tmp = transAcquireExHandle(transGetInstMgt(), refId);
pRpc->refId = refId;
pRpc->shareConn = pInit->shareConn;
return (void*)refId;
_end:
taosMemoryFree(pRpc);
@ -127,9 +139,8 @@ void rpcClose(void* arg) {
if (arg == NULL) {
return;
}
TAOS_UNUSED(transRemoveExHandle(transGetInstMgt(), (int64_t)arg));
TAOS_UNUSED(transReleaseExHandle(transGetInstMgt(), (int64_t)arg));
transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
tInfo("end to close rpc");
return;
}
@ -175,29 +186,29 @@ void* rpcReallocCont(void* ptr, int64_t contLen) {
return st + TRANS_MSG_OVERHEAD;
}
int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL);
int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(pInit, pEpSet, pMsg, NULL);
}
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
return transSendRequest(pInit, pEpSet, pMsg, pCtx);
} else {
return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);
return transSendRequestWithId(pInit, pEpSet, pMsg, pRid);
}
}
int32_t rpcSendRequestWithId(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
return transSendRequestWithId(shandle, pEpSet, pReq, transpointId);
int32_t rpcSendRequestWithId(void* pInit, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) {
return transSendRequestWithId(pInit, pEpSet, pReq, transpointId);
}
int32_t rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
int32_t rpcSendRecv(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(pInit, pEpSet, pMsg, pRsp);
}
int32_t rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
int32_t rpcSendRecvWithTimeout(void* pInit, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
return transSendRecvWithTimeout(pInit, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
}
int32_t rpcFreeConnById(void* shandle, int64_t connId) { return transFreeConnById(shandle, connId); }
int32_t rpcFreeConnById(void* pInit, int64_t connId) { return transFreeConnById(pInit, connId); }
int32_t rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }

File diff suppressed because it is too large Load Diff

View File

@ -15,7 +15,7 @@
#include "transComm.h"
#define BUFFER_CAP 4096
#define BUFFER_CAP 8 * 1024
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
@ -59,7 +59,7 @@ int32_t transCompressMsg(char* msg, int32_t len) {
taosMemoryFree(buf);
return ret;
}
int32_t transDecompressMsg(char** msg, int32_t len) {
int32_t transDecompressMsg(char** msg, int32_t* len) {
STransMsgHead* pHead = (STransMsgHead*)(*msg);
if (pHead->comp == 0) return 0;
@ -68,6 +68,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
STransCompMsg* pComp = (STransCompMsg*)pCont;
int32_t oriLen = htonl(pComp->contLen);
int32_t tlen = *len;
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
if (buf == NULL) {
return terrno;
@ -75,9 +76,10 @@ int32_t transDecompressMsg(char** msg, int32_t len) {
STransMsgHead* pNewHead = (STransMsgHead*)buf;
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content,
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
tlen - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
*len = oriLen + sizeof(STransMsgHead);
pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
taosMemoryFree(pHead);
@ -95,13 +97,12 @@ void transFreeMsg(void* msg) {
tTrace("rpc free cont:%p", (char*)msg - TRANS_MSG_OVERHEAD);
taosMemoryFree((char*)msg - sizeof(STransMsgHead));
}
int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
void transSockInfo2Str(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
int32_t transInitBuffer(SConnBuffer* buf) {
buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
@ -116,10 +117,9 @@ int32_t transInitBuffer(SConnBuffer* buf) {
buf->invalid = 0;
return 0;
}
int32_t transDestroyBuffer(SConnBuffer* p) {
void transDestroyBuffer(SConnBuffer* p) {
taosMemoryFree(p->buf);
p->buf = NULL;
return 0;
}
int32_t transClearBuffer(SConnBuffer* buf) {
@ -184,7 +184,7 @@ int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
}
}
} else {
tError("failed to reset buffer, total:%d, len:%d, reason:%s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
tError("failed to reset buffer, total:%d, len:%d since %s", p->total, p->len, tstrerror(TSDB_CODE_INVALID_MSG));
return TSDB_CODE_INVALID_MSG;
}
return 0;
@ -281,7 +281,7 @@ int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAs
async->data = item;
err = uv_async_init(loop, async, cb);
if (err != 0) {
tError("failed to init async, reason:%s", uv_err_name(err));
tError("failed to init async since %s", uv_err_name(err));
code = TSDB_CODE_THIRDPARTY_ERROR;
break;
}
@ -333,14 +333,16 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
SAsyncItem* item = async->data;
if (taosThreadMutexLock(&item->mtx) != 0) {
tError("failed to lock mutex");
tError("failed to lock mutex since %s", tstrerror(terrno));
return terrno;
}
QUEUE_PUSH(&item->qmsg, q);
TAOS_UNUSED(taosThreadMutexUnlock(&item->mtx));
int ret = uv_async_send(async);
if (ret != 0) {
tError("failed to send async,reason:%s", uv_err_name(ret));
tError("failed to send async since %s", uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
return 0;
@ -348,15 +350,17 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
void transCtxInit(STransCtx* ctx) {
// init transCtx
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UINT), true, HASH_NO_LOCK);
ctx->args = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
}
void transCtxCleanup(STransCtx* ctx) {
if (ctx->args == NULL) {
if (ctx == NULL || ctx->args == NULL) {
return;
}
STransCtxVal* iter = taosHashIterate(ctx->args, NULL);
while (iter) {
int32_t* type = taosHashGetKey(iter, NULL);
tDebug("free msg type %s dump func", TMSG_INFO(*type));
ctx->freeFunc(iter->val);
iter = taosHashIterate(ctx->args, iter);
}
@ -385,7 +389,7 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) {
int32_t code = taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal));
if (code != 0) {
tError("failed to put val to hash, reason:%s", tstrerror(code));
tError("failed to put val to hash since %s", tstrerror(code));
}
iter = taosHashIterate(src->args, iter);
}
@ -415,119 +419,92 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
return ret;
}
void transReqQueueInit(queue* q) {
// init req queue
QUEUE_INIT(q);
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
QUEUE_INIT(&wq->node);
wq->freeFunc = (void (*)(void*))freeFunc;
wq->size = 0;
return 0;
}
void* transReqQueuePush(queue* q) {
STransReq* req = taosMemoryCalloc(1, sizeof(STransReq));
if (req == NULL) {
return NULL;
void transQueuePush(STransQueue* q, void* arg) {
queue* node = arg;
QUEUE_PUSH(&q->node, node);
q->size++;
}
void* transQueuePop(STransQueue* q) {
if (q->size == 0) return NULL;
queue* head = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(head);
q->size--;
return head;
}
int32_t transQueueSize(STransQueue* q) { return q->size; }
void* transQueueGet(STransQueue* q, int idx) {
if (q->size == 0) return NULL;
while (idx-- > 0) {
queue* node = QUEUE_NEXT(&q->node);
if (node == &q->node) return NULL;
}
req->wreq.data = req;
QUEUE_PUSH(q, &req->q);
return &req->wreq;
return NULL;
}
void* transReqQueueRemove(void* arg) {
void* ret = NULL;
uv_write_t* wreq = arg;
STransReq* req = wreq ? wreq->data : NULL;
if (req == NULL) return NULL;
QUEUE_REMOVE(&req->q);
ret = wreq && wreq->handle ? wreq->handle->data : NULL;
taosMemoryFree(req);
return ret;
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
queue* d = dst;
queue* node = QUEUE_NEXT(&q->node);
while (node != &q->node) {
queue* next = QUEUE_NEXT(node);
if (filter && filter(node, arg)) {
QUEUE_REMOVE(node);
q->size--;
QUEUE_PUSH(d, node);
if (--size == 0) {
break;
}
}
node = next;
}
}
void transReqQueueClear(queue* q) {
while (!QUEUE_IS_EMPTY(q)) {
queue* h = QUEUE_HEAD(q);
void* tranQueueHead(STransQueue* q) {
if (q->size == 0) return NULL;
queue* head = QUEUE_HEAD(&q->node);
return head;
}
void* transQueueRm(STransQueue* q, int i) {
// if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
// return NULL;
// }
// if (i >= taosArrayGetSize(queue->q)) {
// return NULL;
// }
// void* ptr = taosArrayGetP(queue->q, i);
// taosArrayRemove(queue->q, i);
// return ptr;
return NULL;
}
void transQueueRemove(STransQueue* q, void* e) {
if (q->size == 0) return;
queue* node = e;
QUEUE_REMOVE(node);
q->size--;
}
bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; }
void transQueueClear(STransQueue* q) {
while (!QUEUE_IS_EMPTY(&q->node)) {
queue* h = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q);
taosMemoryFree(req);
if (q->freeFunc != NULL) (q->freeFunc)(h);
q->size--;
}
}
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) {
queue->q = taosArrayInit(2, sizeof(void*));
if (queue->q == NULL) {
return terrno;
}
queue->freeFunc = (void (*)(const void*))freeFunc;
return 0;
}
bool transQueuePush(STransQueue* queue, void* arg) {
if (queue->q == NULL) {
return true;
}
if (taosArrayPush(queue->q, &arg) == NULL) {
return false;
}
if (taosArrayGetSize(queue->q) > 1) {
return false;
}
return true;
}
void* transQueuePop(STransQueue* queue) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, 0);
taosArrayRemove(queue->q, 0);
return ptr;
}
int32_t transQueueSize(STransQueue* queue) {
if (queue->q == NULL) {
return 0;
}
return taosArrayGetSize(queue->q);
}
void* transQueueGet(STransQueue* queue, int i) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i);
return ptr;
}
void* transQueueRm(STransQueue* queue, int i) {
if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL;
}
if (i >= taosArrayGetSize(queue->q)) {
return NULL;
}
void* ptr = taosArrayGetP(queue->q, i);
taosArrayRemove(queue->q, i);
return ptr;
}
bool transQueueEmpty(STransQueue* queue) {
if (queue->q == NULL) {
return true;
}
return taosArrayGetSize(queue->q) == 0;
}
void transQueueClear(STransQueue* queue) {
if (queue->freeFunc != NULL) {
for (int i = 0; i < taosArrayGetSize(queue->q); i++) {
void* p = taosArrayGetP(queue->q, i);
queue->freeFunc(p);
}
}
taosArrayClear(queue->q);
}
void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue);
taosArrayDestroy(queue->q);
}
void transQueueDestroy(STransQueue* q) { transQueueClear(q); }
static FORCE_INLINE int32_t timeCompare(const HeapNode* a, const HeapNode* b) {
SDelayTask* arg1 = container_of(a, SDelayTask, node);
@ -690,7 +667,13 @@ void transPrintEpSet(SEpSet* pEpSet) {
len += tsnprintf(buf + len, sizeof(buf) - len, "}");
tTrace("%s, inUse:%d", buf, pEpSet->inUse);
}
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
bool transReqEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) {
if (a == NULL && b == NULL) {
return true;
} else if (a == NULL || b == NULL) {
return false;
}
if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
return false;
}
@ -701,7 +684,7 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
}
return true;
}
bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) {
bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) {
if (a->numOfEps != b->numOfEps) {
return false;
}
@ -757,28 +740,31 @@ int64_t transAddExHandle(int32_t refMgt, void* p) {
// acquire extern handle
return taosAddRef(refMgt, p);
}
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
void transRemoveExHandle(int32_t refMgt, int64_t refId) {
// acquire extern handle
return taosRemoveRef(refMgt, refId);
int32_t code = taosRemoveRef(refMgt, refId);
if (code != 0) {
tTrace("failed to remove %" PRId64 " from resetId:%d", refId, refMgt);
}
}
void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle
return (void*)taosAcquireRef(refMgt, refId);
}
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
void transReleaseExHandle(int32_t refMgt, int64_t refId) {
// release extern handle
return taosReleaseRef(refMgt, refId);
int32_t code = taosReleaseRef(refMgt, refId);
if (code != 0) {
tTrace("failed to release %" PRId64 " from resetId:%d", refId, refMgt);
}
}
void transDestroyExHandle(void* handle) {
if (handle == NULL) {
return;
}
SExHandle* eh = handle;
if (!QUEUE_IS_EMPTY(&eh->q)) {
tDebug("handle %p mem leak", handle);
}
tDebug("free exhandle %p", handle);
tDebug("trans destroy sid:%" PRId64 ", memory %p", eh->refId, handle);
taosMemoryFree(handle);
}
@ -841,7 +827,7 @@ int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) {
int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32);
if (err != 0) {
tError("failed to convert ip to string, reason:%s", uv_strerror(err));
tError("failed to convert ip to string since %s", uv_strerror(err));
return TSDB_CODE_THIRDPARTY_ERROR;
}
@ -890,3 +876,113 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) {
// STUB_RAND_NETWORK_ERR(status)
// return status;
// }
int32_t initWQ(queue* wq) {
int32_t code = 0;
QUEUE_INIT(wq);
for (int i = 0; i < 4; i++) {
SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
if (w == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _exception);
}
w->wreq.data = w;
w->arg = NULL;
QUEUE_INIT(&w->node);
QUEUE_PUSH(wq, &w->q);
}
return 0;
_exception:
destroyWQ(wq);
return code;
}
void destroyWQ(queue* wq) {
while (!QUEUE_IS_EMPTY(wq)) {
queue* h = QUEUE_HEAD(wq);
QUEUE_REMOVE(h);
SWReqsWrapper* w = QUEUE_DATA(h, SWReqsWrapper, q);
taosMemoryFree(w);
}
}
uv_write_t* allocWReqFromWQ(queue* wq, void* arg) {
if (!QUEUE_IS_EMPTY(wq)) {
queue* node = QUEUE_HEAD(wq);
QUEUE_REMOVE(node);
SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q);
w->arg = arg;
QUEUE_INIT(&w->node);
return &w->wreq;
} else {
SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
if (w == NULL) {
return NULL;
}
w->wreq.data = w;
w->arg = arg;
QUEUE_INIT(&w->node);
return &w->wreq;
}
}
void freeWReqToWQ(queue* wq, SWReqsWrapper* w) {
QUEUE_INIT(&w->node);
QUEUE_PUSH(wq, &w->q);
}
int32_t transSetReadOption(uv_handle_t* handle) {
int32_t code = 0;
int32_t fd;
int ret = uv_fileno((uv_handle_t*)handle, &fd);
if (ret != 0) {
tWarn("failed to get fd since %s", uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
code = taosSetSockOpt2(fd);
return code;
}
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) {
if (pEpset == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReqEpSet == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps;
SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size);
if (pReq == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy((char*)pReq, (char*)pEpset, size);
// clear previous
taosMemoryFree(*pReqEpSet);
if (transValidReqEpset(pReq) != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
return TSDB_CODE_INVALID_PARA;
}
*pReqEpSet = pReq;
return TSDB_CODE_SUCCESS;
}
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) {
if (pReqEpSet == NULL) {
return TSDB_CODE_INVALID_PARA;
}
memcpy((char*)pEpSet, (char*)pReqEpSet, sizeof(SReqEpSet) + sizeof(SEp) * pReqEpSet->numOfEps);
return TSDB_CODE_SUCCESS;
}
int32_t transValidReqEpset(SReqEpSet* pReqEpSet) {
if (pReqEpSet == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReqEpSet->numOfEps == 0 || pReqEpSet->numOfEps > TSDB_MAX_EP_NUM || pReqEpSet->inUse >= TSDB_MAX_EP_NUM) {
return TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
}

File diff suppressed because it is too large Load Diff

View File

@ -117,7 +117,7 @@ int main(int argc, char *argv[]) {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.connLimitNum = 10;
rpcInit.connLimitLock = 1;
rpcInit.batchSize = 16 * 1024;
rpcInit.shareConnLimit = 16 * 1024;
rpcInit.supportBatch = 1;
rpcDebugFlag = 135;

View File

@ -398,7 +398,8 @@ HANDLE taosOpenFileNotStream(const char *path, int32_t tdFileOptions) {
DWORD dwError = GetLastError();
terrno = TAOS_SYSTEM_WINAPI_ERROR(dwError);
// LPVOID lpMsgBuf;
// FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, dwError, 0, (LPTSTR)&lpMsgBuf, 0,
// FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, dwError, 0, (LPTSTR)&lpMsgBuf,
// 0,
// NULL);
// printf("CreateFile failed with error %d: %s", dwError, (char *)lpMsgBuf);
// LocalFree(lpMsgBuf);
@ -983,7 +984,7 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
}
#ifdef _TD_DARWIN_64
if(lseek(pFileIn->fd, (int32_t)(*offset), 0) < 0) {
if (lseek(pFileIn->fd, (int32_t)(*offset), 0) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -1469,7 +1470,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
while (!feof(pSrcFile->fp)) {
len = (int32_t)fread(data, 1, compressSize, pSrcFile->fp);
if (len > 0) {
if(gzwrite(dstFp, data, len) == 0) {
if (gzwrite(dstFp, data, len) == 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
ret = terrno;
goto cmp_end;

View File

@ -308,6 +308,7 @@ void *taosMemoryCalloc(int64_t num, int64_t size) {
uint32_t r = taosRand() % tsRandErrDivisor;
if ((r + 1) <= tsRandErrChance) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("random memory error: %s, %s", tstrerror(terrno), __func__);
return NULL;
}
}

View File

@ -74,7 +74,7 @@ int32_t taosGetAppName(char* name, int32_t* len) {
int32_t tsem_wait(tsem_t* sem) {
DWORD ret = WaitForSingleObject(*sem, INFINITE);
if(ret == WAIT_OBJECT_0) {
if (ret == WAIT_OBJECT_0) {
return 0;
} else {
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
@ -140,7 +140,7 @@ int32_t tsem_wait(tsem_t *psem) {
int32_t tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
if(dispatch_semaphore_wait(*psem, time) == 0) {
if (dispatch_semaphore_wait(*psem, time) == 0) {
return 0;
} else {
return TSDB_CODE_TIMEOUT_ERROR;
@ -228,8 +228,8 @@ int32_t taosGetAppName(char* name, int32_t* len) {
return 0;
}
int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) {
if(sem_init(psem, flags, count) == 0) {
int32_t tsem_init(tsem_t* psem, int flags, unsigned int count) {
if (sem_init(psem, flags, count) == 0) {
return 0;
} else {
return terrno = TAOS_SYSTEM_ERROR(errno);
@ -251,9 +251,9 @@ int32_t tsem_timewait(tsem_t* sem, int64_t ms) {
ts.tv_nsec %= 1000000000;
while ((ret = sem_timedwait(sem, &ts)) == -1) {
if(errno == EINTR) {
if (errno == EINTR) {
continue;
} else if(errno == ETIMEDOUT) {
} else if (errno == ETIMEDOUT) {
return TSDB_CODE_TIMEOUT_ERROR;
} else {
terrno = TAOS_SYSTEM_ERROR(errno);
@ -315,7 +315,7 @@ int32_t tsem_post(tsem_t* psem) {
}
}
int32_t tsem_destroy(tsem_t *sem) {
int32_t tsem_destroy(tsem_t* sem) {
if (sem_destroy(sem) == 0) {
return 0;
} else {
@ -323,7 +323,7 @@ int32_t tsem_destroy(tsem_t *sem) {
}
}
int tsem2_post(tsem2_t *sem) {
int tsem2_post(tsem2_t* sem) {
int32_t code = taosThreadMutexLock(&sem->mutex);
if (code) {
return code;

View File

@ -482,3 +482,18 @@ uint64_t taosNtoh64(uint64_t val) {
}
#endif
}
int32_t taosSetSockOpt2(int32_t fd) {
#if defined(WINDOWS) || defined(DARWIN)
return 0;
#else
int32_t ret = setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, (int[]){1}, sizeof(int));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
} else {
return 0;
}
#endif
return 0;
}

View File

@ -59,6 +59,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")

View File

@ -225,6 +225,7 @@ void destroyPriorityQueue(PriorityQueue* pq) {
static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ }
static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ }
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */ }
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void* tmp = a->data;
a->data = b->data;

View File

@ -19,7 +19,7 @@
#include "tlog.h"
#include "tutil.h"
#define TSDB_REF_OBJECTS 50
#define TSDB_REF_OBJECTS 100
#define TSDB_REF_STATE_EMPTY 0
#define TSDB_REF_STATE_ACTIVE 1
#define TSDB_REF_STATE_DELETED 2
@ -56,7 +56,7 @@ static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy);
static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRsetCount(SRefSet *pSet);
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased);
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t *isReleased);
int32_t taosOpenRef(int32_t max, RefFp fp) {
SRefNode **nodeList;
@ -254,7 +254,9 @@ void *taosAcquireRef(int32_t rsetId, int64_t rid) {
}
int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0, NULL); }
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t* isReleased) { return taosDecRefCount(rsetId, rid, 0, isReleased); }
int32_t taosReleaseRefEx(int32_t rsetId, int64_t rid, int32_t *isReleased) {
return taosDecRefCount(rsetId, rid, 0, isReleased);
}
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosIterateRef(int32_t rsetId, int64_t rid) {
@ -387,7 +389,7 @@ int32_t taosListRef() {
return num;
}
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t* isReleased) {
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove, int32_t *isReleased) {
int32_t hash;
SRefSet *pSet;
SRefNode *pNode;

View File

@ -0,0 +1,55 @@
#include <gtest/gtest.h>
#include "taoserror.h"
#include "theap.h"
using namespace std;
typedef struct TNode {
int32_t data;
HeapNode node;
} TNodeMem;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
int32_t heapCompare(const HeapNode* a, const HeapNode* b) {
TNodeMem *ta = container_of(a, TNodeMem, node);
TNodeMem *tb = container_of(b, TNodeMem, node);
if (ta->data > tb->data) {
return 0;
}
return 1;
}
TEST(TD_UTIL_HEAP_TEST, heapTest) {
Heap* heap = heapCreate(heapCompare);
ASSERT_TRUE(heap != NULL);
ASSERT_EQ(0, heapSize(heap));
int32_t limit = 10;
TNodeMem **pArr = (TNodeMem **)taosMemoryCalloc(100, sizeof(TNodeMem *));
for (int i = 0; i < 100; i++) {
TNodeMem *a = (TNodeMem *)taosMemoryCalloc(1, sizeof(TNodeMem));
a->data = i%limit;
heapInsert(heap, &a->node);
pArr[i] = a;
TNodeMem *b = (TNodeMem *)taosMemoryCalloc(1, sizeof(TNodeMem));
b->data = (limit - i)%limit;
heapInsert(heap, &b->node);
}
for (int i = 98; i < 100; i++) {
TNodeMem *p = pArr[i];
p->data = -100000;
}
HeapNode *node = heapMin(heap);
while (node != NULL) {
TNodeMem *data = container_of(node, TNodeMem, node);
heapRemove(heap, node);
printf("%d\t", data->data);
node = heapMin(heap);
}
heapDestroy(heap);
}

View File

@ -96,6 +96,7 @@ while $i < 5
sql drop index $sma
endw
#sleep 5000
sql drop stable $mtPrefix
sql select * from information_schema.ins_indexes
if $rows != 0 then