From 2fdb4f917cabb1e4fcf17ac1435e8015cd38e728 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Jul 2021 11:23:07 +0800 Subject: [PATCH 1/7] [TD-5235]:offload msg processing from rpc thread to tsc scheduler --- src/client/src/tscParseLineProtocol.c | 2 +- src/client/src/tscServer.c | 50 +++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index 3613bad534..c1596ac087 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -481,7 +481,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm size_t tagIndex = taosArrayGetSize(schema->tags) - 1; taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); } - tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", + tscDebug("SML:0x%"PRIx64 " load table meta succeed. table name: %s, columns number: %d, tag number: %d, precision: %d", info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); free(tableMeta); tableMeta = NULL; return code; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index eaf397529b..53e36ddf83 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -337,11 +337,16 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { +static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { + SRpcMsg* rpcMsg = pSchedMsg->ahandle; + SRpcEpSet* pEpSet = pSchedMsg->thandle; + TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } @@ -359,17 +364,21 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", - pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); + pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } @@ -393,13 +402,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure - rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || + rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && - !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { // do nothing in case of super table subquery } else { pSql->retry += 1; @@ -422,6 +431,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } } @@ -429,7 +440,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } pRes->rspLen = 0; - + if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); } else { @@ -473,12 +484,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { pRes->numOfRows += pMsg->affectedRows; tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], - tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); + tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); } else { tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); } } - + if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -499,6 +510,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); +} + +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { + SSchedMsg schedMsg = {0}; + + schedMsg.fp = doProcessMsgFromServer; + + SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg)); + memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg)); + rpcMsgCopy->pCont = rpcMallocCont(rpcMsg->contLen); + memcpy(rpcMsgCopy->pCont, rpcMsg->pCont, rpcMsg->contLen); + schedMsg.ahandle = (void*)rpcMsgCopy; + + SRpcEpSet* pEpSetCopy = NULL; + if (pEpSet != NULL) { + pEpSetCopy = calloc(1, sizeof(SRpcEpSet)); + memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet)); + } + + schedMsg.thandle = (void*)pEpSetCopy; + schedMsg.msg = NULL; + + taosScheduleTask(tscQhandle, &schedMsg); } int doBuildAndSendMsg(SSqlObj *pSql) { From c4edb203dc633bd2122217fc2d47bb8fe52b8f94 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Jul 2021 15:28:24 +0800 Subject: [PATCH 2/7] [TD-5235]:try not to copy the content of rpc msg --- src/client/src/tscServer.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 53e36ddf83..6246a3839c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -521,8 +521,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg)); memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg)); - rpcMsgCopy->pCont = rpcMallocCont(rpcMsg->contLen); - memcpy(rpcMsgCopy->pCont, rpcMsg->pCont, rpcMsg->contLen); schedMsg.ahandle = (void*)rpcMsgCopy; SRpcEpSet* pEpSetCopy = NULL; From 1dff9533fc63b093c66737f555d8d78022010ffc Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Jul 2021 16:41:30 +0800 Subject: [PATCH 3/7] [TD-5633]:fix memory leak of intermediate result buf --- src/query/src/qAggMain.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index d96b260b13..1ed63be070 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4132,6 +4132,11 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { pDist->rowSize = (uint16_t)pCtx->param[0].i64; generateBlockDistResult(pDist, pCtx->pOutput); + if (pDist->dataBlockInfos != NULL) { + taosArrayDestroy(pDist->dataBlockInfos); + pDist->dataBlockInfos = NULL; + } + // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; pResInfo->hasResult = DATA_SET_FLAG; From 7032e5a00b5a7c25194433b0e86b8a662105cfb8 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Jul 2021 11:23:07 +0800 Subject: [PATCH 4/7] [TD-5235]:offload msg processing from rpc thread to tsc scheduler --- src/client/src/tscParseLineProtocol.c | 2 +- src/client/src/tscServer.c | 50 +++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index c6cdee6f1f..7d2823a42e 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -484,7 +484,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm size_t tagIndex = taosArrayGetSize(schema->tags) - 1; taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); } - tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", + tscDebug("SML:0x%"PRIx64 " load table meta succeed. table name: %s, columns number: %d, tag number: %d, precision: %d", info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); free(tableMeta); tableMeta = NULL; return code; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cebabfc024..963b831c1d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -337,11 +337,16 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { +static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { + SRpcMsg* rpcMsg = pSchedMsg->ahandle; + SRpcEpSet* pEpSet = pSchedMsg->thandle; + TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } @@ -359,17 +364,21 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", - pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); + pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } @@ -393,13 +402,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure - rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || + rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && - !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { // do nothing in case of super table subquery } else { pSql->retry += 1; @@ -422,6 +431,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } } @@ -429,7 +440,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } pRes->rspLen = 0; - + if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); } else { @@ -473,12 +484,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { pRes->numOfRows += pMsg->affectedRows; tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], - tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); + tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); } else { tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); } } - + if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -499,6 +510,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); +} + +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { + SSchedMsg schedMsg = {0}; + + schedMsg.fp = doProcessMsgFromServer; + + SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg)); + memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg)); + rpcMsgCopy->pCont = rpcMallocCont(rpcMsg->contLen); + memcpy(rpcMsgCopy->pCont, rpcMsg->pCont, rpcMsg->contLen); + schedMsg.ahandle = (void*)rpcMsgCopy; + + SRpcEpSet* pEpSetCopy = NULL; + if (pEpSet != NULL) { + pEpSetCopy = calloc(1, sizeof(SRpcEpSet)); + memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet)); + } + + schedMsg.thandle = (void*)pEpSetCopy; + schedMsg.msg = NULL; + + taosScheduleTask(tscQhandle, &schedMsg); } int doBuildAndSendMsg(SSqlObj *pSql) { From babde623be0cfa2b291661e53d6cbecea39bdcd9 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Jul 2021 15:28:24 +0800 Subject: [PATCH 5/7] [TD-5235]:try not to copy the content of rpc msg --- src/client/src/tscServer.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 963b831c1d..434b798fc6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -521,8 +521,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg)); memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg)); - rpcMsgCopy->pCont = rpcMallocCont(rpcMsg->contLen); - memcpy(rpcMsgCopy->pCont, rpcMsg->pCont, rpcMsg->contLen); schedMsg.ahandle = (void*)rpcMsgCopy; SRpcEpSet* pEpSetCopy = NULL; From f385b1900223004c27c2c4b68839ac0d04f09d3e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 29 Jul 2021 16:41:30 +0800 Subject: [PATCH 6/7] [TD-5633]:fix memory leak of intermediate result buf --- src/query/src/qAggMain.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index d96b260b13..1ed63be070 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4132,6 +4132,11 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { pDist->rowSize = (uint16_t)pCtx->param[0].i64; generateBlockDistResult(pDist, pCtx->pOutput); + if (pDist->dataBlockInfos != NULL) { + taosArrayDestroy(pDist->dataBlockInfos); + pDist->dataBlockInfos = NULL; + } + // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; pResInfo->hasResult = DATA_SET_FLAG; From 754b7b42418dce80c6c056c1f9fefc80891f3ebc Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 30 Jul 2021 11:12:04 +0800 Subject: [PATCH 7/7] [TD-5659]:start taos processing with subprocess instead of internal function --- tests/pytest/insert/metadataUpdate.py | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/tests/pytest/insert/metadataUpdate.py b/tests/pytest/insert/metadataUpdate.py index 1a960a20e6..f996a707ff 100644 --- a/tests/pytest/insert/metadataUpdate.py +++ b/tests/pytest/insert/metadataUpdate.py @@ -16,7 +16,6 @@ from util.log import tdLog from util.cases import tdCases from util.sql import tdSql from util.dnodes import tdDnodes -from multiprocessing import Process import subprocess class TDTestCase: @@ -28,16 +27,6 @@ class TDTestCase: self.tables = 10 self.rows = 1000 - def updateMetadata(self): - self.host = "127.0.0.1" - self.user = "root" - self.password = "taosdata" - self.config = tdDnodes.getSimCfgPath() - - self.conn = taos.connect(host = self.host, user = self.user, password = self.password, config = self.config) - self.cursor = self.conn.cursor() - self.cursor.execute("alter table db.tb add column col2 int") - print("alter table done") def deleteTableAndRecreate(self): self.config = tdDnodes.getSimCfgPath() @@ -68,11 +57,15 @@ class TDTestCase: tdSql.query("select * from tb") tdSql.checkRows(1) - p = Process(target=self.updateMetadata, args=()) - p.start() - p.join() - p.terminate() - + self.config = tdDnodes.getSimCfgPath() + command = ["taos", "-c", self.config, "-s", "alter table db.tb add column col2 int;"] + print("alter table db.tb add column col2 int;") + result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + if result.returncode == 0: + print("success:", result) + else: + print("error:", result) + tdSql.execute("insert into tb(ts, col1, col2) values(%d, 1, 2)" % (self.ts + 2)) print("==============step2")