[td-1830]
This commit is contained in:
parent
b736211317
commit
6ee9d124e6
|
@ -150,7 +150,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
|||
if (pObj == NULL) return;
|
||||
|
||||
if (pObj != pObj->signature) {
|
||||
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
||||
tscError("heartbeat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -175,12 +175,12 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
|||
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
||||
}
|
||||
} else {
|
||||
tscDebug("heartbeat failed, code:%s", tstrerror(code));
|
||||
tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pObj->pHb != NULL) {
|
||||
int32_t waitingDuring = tsShellActivityTimer * 500;
|
||||
tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
|
||||
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
|
||||
|
||||
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
|
||||
} else {
|
||||
|
@ -1639,11 +1639,14 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
tscError("%p failed to malloc for heartbeat msg", pSql);
|
||||
tscError("%p failed to create heartbeat msg", pSql);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// TODO the expired hb and client can not be identified by server till now.
|
||||
SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
|
||||
tstrncpy(pHeartbeat->clientVer, version, tListLen(pHeartbeat->clientVer));
|
||||
|
||||
pHeartbeat->numOfQueries = numOfQueries;
|
||||
pHeartbeat->numOfStreams = numOfStreams;
|
||||
|
||||
|
@ -1996,10 +1999,11 @@ static void createHBObj(STscObj* pObj) {
|
|||
}
|
||||
|
||||
int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||
char temp[TSDB_TABLE_FNAME_LEN * 2];
|
||||
STscObj *pObj = pSql->pTscObj;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};
|
||||
|
||||
SCMConnectRsp *pConnect = (SCMConnectRsp *)pRes->pRsp;
|
||||
tstrncpy(pObj->acctId, pConnect->acctId, sizeof(pObj->acctId)); // copy acctId from response
|
||||
int32_t len = sprintf(temp, "%s%s%s", pObj->acctId, TS_PATH_DELIMITER, pObj->db);
|
||||
|
@ -2018,6 +2022,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
|
|||
pObj->connId = htonl(pConnect->connId);
|
||||
|
||||
createHBObj(pObj);
|
||||
|
||||
//launch a timer to send heartbeat to maintain the connection and send status to mnode
|
||||
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -781,6 +781,7 @@ typedef struct {
|
|||
} SStreamDesc;
|
||||
|
||||
typedef struct {
|
||||
char clientVer[TSDB_VERSION_LEN];
|
||||
uint32_t connId;
|
||||
int32_t pid;
|
||||
int32_t numOfQueries;
|
||||
|
|
|
@ -232,12 +232,16 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
||||
SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
|
||||
if (pHBRsp == NULL) {
|
||||
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *) rpcMallocCont(sizeof(SCMHeartBeatRsp));
|
||||
if (pRsp == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
|
||||
if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code
|
||||
}
|
||||
|
||||
SRpcConnInfo connInfo = {0};
|
||||
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
|
||||
|
||||
|
@ -251,33 +255,33 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
|
|||
if (pConn == NULL) {
|
||||
// do not close existing links, otherwise
|
||||
// mError("failed to create connId, close connect");
|
||||
// pHBRsp->killConnection = 1;
|
||||
// pRsp->killConnection = 1;
|
||||
} else {
|
||||
pHBRsp->connId = htonl(pConn->connId);
|
||||
pRsp->connId = htonl(pConn->connId);
|
||||
mnodeSaveQueryStreamList(pConn, pHBMsg);
|
||||
|
||||
if (pConn->killed != 0) {
|
||||
pHBRsp->killConnection = 1;
|
||||
pRsp->killConnection = 1;
|
||||
}
|
||||
|
||||
if (pConn->streamId != 0) {
|
||||
pHBRsp->streamId = htonl(pConn->streamId);
|
||||
pRsp->streamId = htonl(pConn->streamId);
|
||||
pConn->streamId = 0;
|
||||
}
|
||||
|
||||
if (pConn->queryId != 0) {
|
||||
pHBRsp->queryId = htonl(pConn->queryId);
|
||||
pRsp->queryId = htonl(pConn->queryId);
|
||||
pConn->queryId = 0;
|
||||
}
|
||||
}
|
||||
|
||||
pHBRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
|
||||
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
|
||||
mnodeGetMnodeEpSetForShell(&pHBRsp->epSet);
|
||||
pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum());
|
||||
pRsp->totalDnodes = htonl(mnodeGetDnodesNum());
|
||||
mnodeGetMnodeEpSetForShell(&pRsp->epSet);
|
||||
|
||||
pMsg->rpcRsp.rsp = pHBRsp;
|
||||
pMsg->rpcRsp.rsp = pRsp;
|
||||
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
|
||||
|
||||
|
||||
mnodeReleaseConn(pConn);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -326,6 +326,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// TODO move to comm module
|
||||
bool taosGetVersionNumber(char *versionStr, int *versionNubmer) {
|
||||
if (versionStr == NULL || versionNubmer == NULL) {
|
||||
return false;
|
||||
|
|
Loading…
Reference in New Issue