fix possible memroy link while process show message
This commit is contained in:
parent
14243057b4
commit
4315758ef3
|
@ -24,14 +24,6 @@ extern "C" {
|
|||
int32_t mgmtInitProfile();
|
||||
void mgmtCleanUpProfile();
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle);
|
||||
void mgmtSaveQhandle(void *qhandle);
|
||||
void mgmtFreeQhandle(void *qhandle);
|
||||
|
||||
void * mgmtMallocQueuedMsg(SRpcMsg *rpcMsg);
|
||||
void * mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg);
|
||||
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -34,6 +34,14 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg);
|
|||
void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg);
|
||||
void mgmtSendSimpleResp(void *thandle, int32_t code);
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle);
|
||||
void *mgmtSaveQhandle(void *qhandle, int32_t size);
|
||||
void mgmtFreeQhandle(void *qhandle, bool forceRemove);
|
||||
|
||||
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg);
|
||||
void *mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg);
|
||||
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
#include "mgmtTable.h"
|
||||
#include "mgmtShell.h"
|
||||
|
||||
void *tsMgmtTmr = NULL;
|
||||
extern void *tsMgmtTmr;
|
||||
static bool tsMgmtIsRunning = false;
|
||||
|
||||
int32_t mgmtStartSystem() {
|
||||
|
@ -51,12 +51,6 @@ int32_t mgmtStartSystem() {
|
|||
mkdir(tsMnodeDir, 0755);
|
||||
}
|
||||
|
||||
tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND");
|
||||
if (tsMgmtTmr == NULL) {
|
||||
mError("failed to init timer");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mgmtInitAccts() < 0) {
|
||||
mError("failed to init accts");
|
||||
return -1;
|
||||
|
|
|
@ -561,17 +561,6 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) {
|
|||
return TSDB_CODE_INVALID_CONNECTION;
|
||||
}
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void mgmtSaveQhandle(void *qhandle) {
|
||||
mTrace("qhandle:%p is allocated", qhandle);
|
||||
}
|
||||
|
||||
void mgmtFreeQhandle(void *qhandle) {
|
||||
mTrace("qhandle:%p is freed", qhandle);
|
||||
}
|
||||
|
||||
int mgmtGetConns(SShowObj *pShow, void *pConn) {
|
||||
// SAcctObj * pAcct = pConn->pAcct;
|
||||
|
@ -771,52 +760,3 @@ int32_t mgmtInitProfile() {
|
|||
|
||||
void mgmtCleanUpProfile() {
|
||||
}
|
||||
|
||||
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
|
||||
bool usePublicIp = false;
|
||||
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
|
||||
if (pUser == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg));
|
||||
pMsg->thandle = rpcMsg->handle;
|
||||
pMsg->msgType = rpcMsg->msgType;
|
||||
pMsg->contLen = rpcMsg->contLen;
|
||||
pMsg->pCont = rpcMsg->pCont;
|
||||
pMsg->pUser = pUser;
|
||||
pMsg->usePublicIp = usePublicIp;
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
|
||||
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
|
||||
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
|
||||
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
|
||||
if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct);
|
||||
if (pMsg->pDnode) mgmtDecDnodeRef(pMsg->pDnode);
|
||||
free(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
|
||||
SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg));
|
||||
|
||||
pDestMsg->thandle = pSrcMsg->thandle;
|
||||
pDestMsg->msgType = pSrcMsg->msgType;
|
||||
pDestMsg->pCont = pSrcMsg->pCont;
|
||||
pDestMsg->contLen = pSrcMsg->contLen;
|
||||
pDestMsg->retry = pSrcMsg->retry;
|
||||
pDestMsg->maxRetry= pSrcMsg->maxRetry;
|
||||
pDestMsg->pUser = pSrcMsg->pUser;
|
||||
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
|
||||
|
||||
pSrcMsg->pCont = NULL;
|
||||
pSrcMsg->pUser = NULL;
|
||||
|
||||
return pDestMsg;
|
||||
}
|
|
@ -23,6 +23,7 @@
|
|||
#include "ttimer.h"
|
||||
#include "tgrant.h"
|
||||
#include "tglobal.h"
|
||||
#include "tcache.h"
|
||||
#include "dnode.h"
|
||||
#include "mgmtDef.h"
|
||||
#include "mgmtLog.h"
|
||||
|
@ -50,10 +51,11 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg);
|
|||
static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
|
||||
static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg);
|
||||
|
||||
extern void *tsMgmtTmr;
|
||||
void *tsMgmtTmr;
|
||||
static void *tsMgmtShellRpc = NULL;
|
||||
static void *tsMgmtTranQhandle = NULL;
|
||||
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0};
|
||||
static void *tsQhandleCache = NULL;
|
||||
static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
|
||||
static SShowRetrieveFp tsMgmtShowRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0};
|
||||
|
||||
|
@ -64,7 +66,9 @@ int32_t mgmtInitShell() {
|
|||
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mgmtProcessConnectMsg);
|
||||
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mgmtProcessUseMsg);
|
||||
|
||||
tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND");
|
||||
tsMgmtTranQhandle = taosInitScheduler(tsMaxShellConns, 1, "mnodeT");
|
||||
tsQhandleCache = taosCacheInit(tsMgmtTmr, 2);
|
||||
|
||||
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0;
|
||||
if (numOfThreads < 1) {
|
||||
|
@ -102,6 +106,12 @@ void mgmtCleanUpShell() {
|
|||
tsMgmtShellRpc = NULL;
|
||||
mPrint("server connection to shell is closed");
|
||||
}
|
||||
|
||||
if (tsQhandleCache) {
|
||||
taosCacheEmpty(tsQhandleCache);
|
||||
taosCacheCleanup(tsQhandleCache);
|
||||
tsQhandleCache = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtAddShellMsgHandle(uint8_t showType, void (*fp)(SQueuedMsg *queuedMsg)) {
|
||||
|
@ -233,14 +243,15 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
|
|||
return;
|
||||
}
|
||||
|
||||
SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen));
|
||||
int32_t showObjSize = sizeof(SShowObj) + htons(pShowMsg->payloadLen);
|
||||
SShowObj *pShow = (SShowObj *) calloc(1, showObjSize);
|
||||
pShow->signature = pShow;
|
||||
pShow->type = pShowMsg->type;
|
||||
pShow->payloadLen = htons(pShowMsg->payloadLen);
|
||||
strcpy(pShow->db, pShowMsg->db);
|
||||
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
|
||||
|
||||
mgmtSaveQhandle(pShow);
|
||||
pShow = mgmtSaveQhandle(pShow, showObjSize);
|
||||
pShowRsp->qhandle = htobe64((uint64_t) pShow);
|
||||
|
||||
mTrace("show:%p, type:%s, start to get meta", pShow, mgmtGetShowTypeStr(pShowMsg->type));
|
||||
|
@ -255,10 +266,10 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
|
|||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mError("show:%p, type:%s, failed to get meta, reason:%s", pShow, mgmtGetShowTypeStr(pShowMsg->type), tstrerror(code));
|
||||
mgmtFreeQhandle(pShow);
|
||||
mgmtFreeQhandle(pShow, false);
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->thandle,
|
||||
.code = code
|
||||
.handle = pMsg->thandle,
|
||||
.code = code
|
||||
};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
@ -284,26 +295,20 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
|
|||
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
|
||||
mTrace("show:%p, type:%s, retrieve data", pShow, mgmtGetShowTypeStr(pShow->type));
|
||||
|
||||
if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
|
||||
mError("pShow:%p, query memory is corrupted", pShow);
|
||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MEMORY_CORRUPTED);
|
||||
return;
|
||||
} else {
|
||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
rowsToRead = pShow->numOfRows - pShow->numOfReads;
|
||||
}
|
||||
|
||||
/* return no more than 100 meters in one round trip */
|
||||
if (rowsToRead > 100) rowsToRead = 100;
|
||||
|
||||
/*
|
||||
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
|
||||
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
|
||||
*/
|
||||
if (rowsToRead < 0) rowsToRead = 0;
|
||||
size = pShow->rowSize * rowsToRead;
|
||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
rowsToRead = pShow->numOfRows - pShow->numOfReads;
|
||||
}
|
||||
|
||||
/* return no more than 100 meters in one round trip */
|
||||
if (rowsToRead > 100) rowsToRead = 100;
|
||||
|
||||
/*
|
||||
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
|
||||
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
|
||||
*/
|
||||
if (rowsToRead < 0) rowsToRead = 0;
|
||||
size = pShow->rowSize * rowsToRead;
|
||||
|
||||
size += 100;
|
||||
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
|
||||
|
||||
|
@ -313,6 +318,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
|
|||
|
||||
if (rowsRead < 0) { // TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
rpcFreeCont(pRsp);
|
||||
mgmtFreeQhandle(pShow, false);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -329,7 +335,9 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
|
|||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
if (rowsToRead == 0) {
|
||||
mgmtFreeQhandle(pShow);
|
||||
mgmtFreeQhandle(pShow, true);
|
||||
} else {
|
||||
mgmtFreeQhandle(pShow, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -511,3 +519,82 @@ void mgmtSendSimpleResp(void *thandle, int32_t code) {
|
|||
};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle) {
|
||||
void *pSaved = taosCacheAcquireByData(tsQhandleCache, (void *)qhandle);
|
||||
if (pSaved == (void *)qhandle) {
|
||||
mTrace("qhandle:%p is retrived", qhandle);
|
||||
return true;
|
||||
} else {
|
||||
mTrace("qhandle:%p is already freed", qhandle);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void* mgmtSaveQhandle(void *qhandle, int32_t size) {
|
||||
if (tsQhandleCache != NULL) {
|
||||
char key[24];
|
||||
sprintf(key, "show:%p", qhandle);
|
||||
void *newQhandle = taosCachePut(tsQhandleCache, key, qhandle, size, 60);
|
||||
free(qhandle);
|
||||
|
||||
mTrace("qhandle:%p is saved", newQhandle);
|
||||
return newQhandle;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void mgmtFreeQhandle(void *qhandle, bool forceRemove) {
|
||||
mTrace("qhandle:%p is freed", qhandle);
|
||||
taosCacheRelease(tsQhandleCache, &qhandle, forceRemove);
|
||||
}
|
||||
|
||||
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
|
||||
bool usePublicIp = false;
|
||||
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
|
||||
if (pUser == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg));
|
||||
pMsg->thandle = rpcMsg->handle;
|
||||
pMsg->msgType = rpcMsg->msgType;
|
||||
pMsg->contLen = rpcMsg->contLen;
|
||||
pMsg->pCont = rpcMsg->pCont;
|
||||
pMsg->pUser = pUser;
|
||||
pMsg->usePublicIp = usePublicIp;
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
|
||||
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
|
||||
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
|
||||
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
|
||||
if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct);
|
||||
if (pMsg->pDnode) mgmtDecDnodeRef(pMsg->pDnode);
|
||||
free(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
|
||||
SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg));
|
||||
|
||||
pDestMsg->thandle = pSrcMsg->thandle;
|
||||
pDestMsg->msgType = pSrcMsg->msgType;
|
||||
pDestMsg->pCont = pSrcMsg->pCont;
|
||||
pDestMsg->contLen = pSrcMsg->contLen;
|
||||
pDestMsg->retry = pSrcMsg->retry;
|
||||
pDestMsg->maxRetry= pSrcMsg->maxRetry;
|
||||
pDestMsg->pUser = pSrcMsg->pUser;
|
||||
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
|
||||
|
||||
pSrcMsg->pCont = NULL;
|
||||
pSrcMsg->pUser = NULL;
|
||||
|
||||
return pDestMsg;
|
||||
}
|
Loading…
Reference in New Issue