From aa143beda43a9e96204558a362c8259f132bef93 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Jun 2022 19:27:51 +0800 Subject: [PATCH 1/4] fix(query): fix invalid free. --- source/client/inc/clientInt.h | 3 ++- source/client/src/clientEnv.c | 1 - source/client/src/clientImpl.c | 24 ++++++++++++------ source/client/src/clientMain.c | 1 - source/client/src/clientMsgHandler.c | 35 ++++++++++++++------------ source/libs/parser/src/parTranslater.c | 1 - 6 files changed, 37 insertions(+), 28 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 9a029b0fea..ba552cc7cb 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -266,7 +266,8 @@ extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; -extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); +__async_send_cb_fn_t getMsgRspHandle(int32_t msgType); + int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c210a7ed39..75e1884360 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -278,7 +278,6 @@ void taos_init_imp(void) { return; } - initMsgHandleFp(); initQueryModuleMsgHandle(); rpcInit(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 85f6e85190..2ddeb7f9d4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -535,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); } -int32_t handleExecRes(SRequestObj* pRequest) { +int32_t handleQueryExecRsp(SRequestObj* pRequest) { if (NULL == pRequest->body.resInfo.execRes.res) { return TSDB_CODE_SUCCESS; } - int32_t code = 0; SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + SAppInstInfo* pAppInfo = getAppInfo(pRequest); + + int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog); if (code) { return code; } - SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp); SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; switch (pRes->msgType) { @@ -565,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) { break; } default: - tscError("invalid exec result for request type %d", pRequest->type); - return TSDB_CODE_APP_ERROR; + tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self, + pRequest->type, pRequest->requestId); + code = TSDB_CODE_APP_ERROR; } return code; @@ -585,6 +587,12 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { return; } + if (code == TSDB_CODE_SUCCESS) { + code = handleQueryExecRsp(pRequest); + ASSERT(pRequest->code == TSDB_CODE_SUCCESS); + pRequest->code = code; + } + if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { removeMeta(pTscObj, pRequest->tableList); } @@ -623,7 +631,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue qDestroyQuery(pQuery); } - handleExecRes(pRequest); + handleQueryExecRsp(pRequest); if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { pRequest->code = terrno; @@ -884,7 +892,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; + pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType); pMsgSendInfo->param = pRequest; SConnectReq connectReq = {0}; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a15f9f17f7..960cda394a 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -748,7 +748,6 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { pRequest->body.queryFp(pRequest->body.param, pRequest, code); } - static void fetchCallback(void* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*) param; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index cdac217084..14e2798d3b 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -21,8 +21,6 @@ #include "tdef.h" #include "tname.h" -int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); - static void setErrno(SRequestObj* pRequest, int32_t code) { pRequest->code = code; terrno = code; @@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { assert(pRequest != NULL); pMsgSendInfo->msgInfo = pRequest->body.requestMsg; - - pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL) - ? genericRspCallback - : handleRequestRspFp[TMSG_INDEX(pRequest->type)]; + pMsgSendInfo->fp = getMsgRspHandle(pRequest->type); return pMsgSendInfo; } @@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { assert(pMsg != NULL && param != NULL); SRequestObj* pRequest = param; @@ -285,13 +280,21 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - -// todo refactor: this arraylist is too large -void initMsgHandleFp() { - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; - handleRequestRspFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = processAlterStbRsp; +__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) { + switch (msgType) { + case TDMT_MND_CONNECT: + return processConnectRsp; + case TDMT_MND_CREATE_DB: + return processCreateDbRsp; + case TDMT_MND_USE_DB: + return processUseDbRsp; + case TDMT_MND_CREATE_STB: + return processCreateSTableRsp; + case TDMT_MND_DROP_DB: + return processDropDbRsp; + case TDMT_MND_ALTER_STB: + return processAlterStbRsp; + default: + return genericRspCallback; + } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f1892d0040..6334d079c8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, } } - taosArrayDestroy(dbCfg.pRetensions); return code; } From 2bd16c4978ea24a626a972c37971b75542e6a9e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Jun 2022 19:58:14 +0800 Subject: [PATCH 2/4] refactor: update output of log. --- source/client/src/clientImpl.c | 4 ---- source/client/src/clientMain.c | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2ddeb7f9d4..c71c7d3487 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -580,7 +580,6 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { STscObj* pTscObj = pRequest->pTscObj; if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { - // todo do nothing in clear value in request tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); pRequest->prevCode = code; doAsyncQuery(pRequest, true); @@ -696,9 +695,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { if (TSDB_CODE_SUCCESS == code) { schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); - // if (NULL != pRes) { - // code = validateSversion(pRequest, pRes); - // } } //todo not to be released here diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 960cda394a..d192b65bf0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -634,7 +634,7 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { // return to app directly taosMemoryFree(pWrapper); - tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:%" PRIx64, pRequest->self, tstrerror(code), + tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); pRequest->code = code; pRequest->body.queryFp(pRequest->body.param, pRequest, code); From b650daa33413418c2fcc283d2efc064f6b61e1f5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Jun 2022 20:00:11 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/client/inc/clientInt.h | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index ba552cc7cb..0630704d93 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -45,7 +45,7 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms -#define SYNC_ON_TOP_OF_ASYNC 1 +#define SYNC_ON_TOP_OF_ASYNC 0 enum { RES_TYPE__QUERY = 1, @@ -268,7 +268,6 @@ extern int32_t clientConnRefPool; __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); -int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo); @@ -294,8 +293,6 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); -void initMsgHandleFp(); - TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port, int connType); From a59f40dde248c215c7b28a591e8387a70ca7aca9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Jun 2022 20:48:16 +0800 Subject: [PATCH 4/4] fix(query): check array length before generating the retention string . --- source/dnode/mnode/impl/src/mndDb.c | 30 +++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 1ce1a2590a..245089ddb7 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1497,20 +1497,26 @@ char *buildRetension(SArray *pRetension) { int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); - - p = taosArrayGet(pRetension, 1); - - v1 = getValOfDiffPrecision(p->freqUnit, p->freq); - v2 = getValOfDiffPrecision(p->keepUnit, p->keep); - len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); - - p = taosArrayGet(pRetension, 2); - - v1 = getValOfDiffPrecision(p->freqUnit, p->freq); - v2 = getValOfDiffPrecision(p->keepUnit, p->keep); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); + if (size > 1) { + len += sprintf(p1 + len, ","); + p = taosArrayGet(pRetension, 1); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); + } + + if (size > 2) { + len += sprintf(p1 + len, ","); + p = taosArrayGet(pRetension, 2); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); + } + varDataSetLen(p1, len); return p1; }