Merge pull request #7073 from taosdata/hotfix/td-5633

[TD-5633]<fix>:fix memory leak of intermediate buf of block_dist
This commit is contained in:
Haojun Liao 2021-08-02 10:45:46 +08:00 committed by GitHub
commit 699e28bc91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 20 deletions

View File

@ -484,7 +484,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm
size_t tagIndex = taosArrayGetSize(schema->tags) - 1; size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
} }
tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", tscDebug("SML:0x%"PRIx64 " load table meta succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
free(tableMeta); tableMeta = NULL; free(tableMeta); tableMeta = NULL;
return code; return code;

View File

@ -337,11 +337,16 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
SRpcMsg* rpcMsg = pSchedMsg->ahandle;
SRpcEpSet* pEpSet = pSchedMsg->thandle;
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pSql == NULL) { if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return; return;
} }
@ -359,6 +364,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosRemoveRef(tscObjRef, handle); taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return; return;
} }
@ -370,6 +377,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosRemoveRef(tscObjRef, handle); taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return; return;
} }
@ -425,6 +434,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return; return;
} }
} }
@ -432,7 +443,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
pRes->rspLen = 0; pRes->rspLen = 0;
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
} else { } else {
@ -481,7 +492,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
} }
} }
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
@ -502,6 +513,29 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosReleaseRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doProcessMsgFromServer;
SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg));
memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg));
schedMsg.ahandle = (void*)rpcMsgCopy;
SRpcEpSet* pEpSetCopy = NULL;
if (pEpSet != NULL) {
pEpSetCopy = calloc(1, sizeof(SRpcEpSet));
memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet));
}
schedMsg.thandle = (void*)pEpSetCopy;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
} }
int doBuildAndSendMsg(SSqlObj *pSql) { int doBuildAndSendMsg(SSqlObj *pSql) {

View File

@ -4132,6 +4132,11 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
pDist->rowSize = (uint16_t)pCtx->param[0].i64; pDist->rowSize = (uint16_t)pCtx->param[0].i64;
generateBlockDistResult(pDist, pCtx->pOutput); generateBlockDistResult(pDist, pCtx->pOutput);
if (pDist->dataBlockInfos != NULL) {
taosArrayDestroy(pDist->dataBlockInfos);
pDist->dataBlockInfos = NULL;
}
// cannot set the numOfIteratedElems again since it is set during previous iteration // cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;

View File

@ -16,7 +16,6 @@ from util.log import tdLog
from util.cases import tdCases from util.cases import tdCases
from util.sql import tdSql from util.sql import tdSql
from util.dnodes import tdDnodes from util.dnodes import tdDnodes
from multiprocessing import Process
import subprocess import subprocess
class TDTestCase: class TDTestCase:
@ -28,16 +27,6 @@ class TDTestCase:
self.tables = 10 self.tables = 10
self.rows = 1000 self.rows = 1000
def updateMetadata(self):
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = tdDnodes.getSimCfgPath()
self.conn = taos.connect(host = self.host, user = self.user, password = self.password, config = self.config)
self.cursor = self.conn.cursor()
self.cursor.execute("alter table db.tb add column col2 int")
print("alter table done")
def deleteTableAndRecreate(self): def deleteTableAndRecreate(self):
self.config = tdDnodes.getSimCfgPath() self.config = tdDnodes.getSimCfgPath()
@ -68,11 +57,15 @@ class TDTestCase:
tdSql.query("select * from tb") tdSql.query("select * from tb")
tdSql.checkRows(1) tdSql.checkRows(1)
p = Process(target=self.updateMetadata, args=()) self.config = tdDnodes.getSimCfgPath()
p.start() command = ["taos", "-c", self.config, "-s", "alter table db.tb add column col2 int;"]
p.join() print("alter table db.tb add column col2 int;")
p.terminate() result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
if result.returncode == 0:
print("success:", result)
else:
print("error:", result)
tdSql.execute("insert into tb(ts, col1, col2) values(%d, 1, 2)" % (self.ts + 2)) tdSql.execute("insert into tb(ts, col1, col2) values(%d, 1, 2)" % (self.ts + 2))
print("==============step2") print("==============step2")