From 25a4645f08968d9fac1f3a579e1be964d266babf Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 19 Apr 2022 14:18:58 +0800 Subject: [PATCH 01/12] fix mem leak --- source/client/src/clientHb.c | 38 ++++++++++++++++++++++++--- source/libs/scheduler/src/scheduler.c | 11 ++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index a6678b2ec0..6159da9cb1 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -630,10 +630,29 @@ void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); + + void *pIter = taosHashIterate(pTarget->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq *pOneReq = pIter; + hbFreeReq(pOneReq); + taosHashCleanup(pOneReq->info); + pIter = taosHashIterate(pTarget->activeInfo, pIter); + } taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; + + + pIter = taosHashIterate(pTarget->connInfo, NULL); + while (pIter != NULL) { + SHbConnInfo *info = pIter; + taosMemoryFree(info->param); + pIter = taosHashIterate(pTarget->connInfo, pIter); + } taosHashCleanup(pTarget->connInfo); pTarget->connInfo = NULL; + + taosMemoryFree(pTarget->key); + taosMemoryFree(pTarget); } } @@ -716,12 +735,23 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - int32_t code = 0; - code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); - if (code) { + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + if (pReq) { + hbFreeReq(pReq); + taosHashCleanup(pReq->info); + taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + } + + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + if (info) { + taosMemoryFree(info->param); + taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + } + + if (NULL == pReq || NULL == info) { return; } + atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6cc435fee4..c0b3ae7055 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2749,4 +2749,15 @@ void schedulerDestroy(void) { taosCloseRef(schMgmt.jobRef); schMgmt.jobRef = 0; } + + if (schMgmt.hbConnections) { + void *pIter = taosHashIterate(schMgmt.hbConnections, NULL); + while (pIter != NULL) { + SSchHbTrans *hb = pIter; + schFreeRpcCtx(&hb->rpcCtx); + pIter = taosHashIterate(schMgmt.hbConnections, pIter); + } + taosHashCleanup(schMgmt.hbConnections); + schMgmt.hbConnections = NULL; + } } From 5f24bf5bf9a89fc6bd8a379a1ca1737c41b79d35 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 19 Apr 2022 17:29:04 +0800 Subject: [PATCH 02/12] enh(rpc): add rpc retry --- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSrv.c | 33 +++++++++++++++------------ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 03b97b6fa1..9e53811fd3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) void transDestroyAsyncPool(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - uv_close((uv_handle_t*)async, NULL); + // uv_close((uv_handle_t*)async, NULL); SAsyncItem* item = async->data; taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 50ce0bddcd..2d37dfa37c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -146,7 +146,8 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister}; +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -209,12 +210,13 @@ static void uvHandleReq(SSrvConn* pConn) { } if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), - transMsg.contLen, pHead->noResp); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, + TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); // no ref here } @@ -354,8 +356,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); pHead->msgLen = htonl(len); wb->base = msg; @@ -685,9 +688,9 @@ static void uvDestroyConn(uv_handle_t* handle) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); - // uv_walk(thrd->loop, uvWalkCb, NULL); - uv_loop_close(thrd->loop); - uv_stop(thrd->loop); + uv_walk(thrd->loop, uvWalkCb, NULL); + // uv_loop_close(thrd->loop); + // uv_stop(thrd->loop); } } @@ -749,9 +752,9 @@ End: void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { - // uv_walk(thrd->loop, uvWalkCb, NULL); - uv_loop_close(thrd->loop); - uv_stop(thrd->loop); + uv_walk(thrd->loop, uvWalkCb, NULL); + // uv_loop_close(thrd->loop); + // uv_stop(thrd->loop); } else { destroyAllConn(thrd); } @@ -802,7 +805,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } taosThreadJoin(pThrd->thread, NULL); - // MAKE_VALGRIND_HAPPY(pThrd->loop); + MAKE_VALGRIND_HAPPY(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); From b1225382683eb116b1c139a781dccd3319cf2c74 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Apr 2022 18:06:59 +0800 Subject: [PATCH 03/12] refactor: do some internal refactor. --- source/libs/function/src/builtinsimpl.c | 4 ++-- source/libs/parser/src/parInsert.c | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7699219f52..3d796515a0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -901,7 +901,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); @@ -919,7 +919,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; pResInfo->numOfRes = 1; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 1e8b732b6a..da759bc15a 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -888,10 +888,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { TSKEY tsKey = TD_ROW_KEY(row); - if (checkTimestamp(pDataBlocks, (const char*)&tsKey) != TSDB_CODE_SUCCESS) { - buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } + checkTimestamp(pDataBlocks, (const char*)&tsKey); } } From 11028921b6e83e61c28585434b8f67c1e0d2a9d6 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 19 Apr 2022 18:16:29 +0800 Subject: [PATCH 04/12] refactor(query): refactor cast function TD-14837 TD-14843 TD-14904 --- include/util/taoserror.h | 1 + source/libs/function/src/builtins.c | 15 ++++++++++++--- source/libs/scalar/src/scalar.c | 3 +++ source/libs/scalar/src/sclfunc.c | 12 ++++++------ 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c5b477343d..6c5e006671 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -617,6 +617,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FUNC_FUNTION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2800) #define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801) #define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802) +#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803) #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 78dec26be5..5ba0cee53e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -28,13 +28,17 @@ static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, cons } static int32_t invaildFuncParaNumErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_NUM, "Invalid number of arguments : %s", pFuncName); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_NUM, "Invalid number of parameters : %s", pFuncName); } static int32_t invaildFuncParaTypeErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Inconsistent datatypes : %s", pFuncName); } +static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName); +} + // There is only one parameter of numeric type, and the return type is parameter type static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { @@ -317,10 +321,15 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; // The function return type has been set during syntax parsing uint8_t para2Type = pFunc->node.resType.type; - if ((TSDB_DATA_TYPE_JSON == para1Type || TSDB_DATA_TYPE_BLOB == para1Type || TSDB_DATA_TYPE_MEDIUMBLOB == para1Type) || - (TSDB_DATA_TYPE_JSON == para2Type || TSDB_DATA_TYPE_BLOB == para2Type || TSDB_DATA_TYPE_MEDIUMBLOB == para2Type)) { + if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT && + para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR && + para2Type != TSDB_DATA_TYPE_TIMESTAMP) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } + int32_t para2Bytes = pFunc->node.resType.bytes; + if (para2Bytes < 0) { //negative value or overflow + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 0432ae1df8..e6de165ab5 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -444,6 +444,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->node.resType.type = TSDB_DATA_TYPE_NULL; } else { res->node.resType = node->node.resType; + if (res->node.resType.type == TSDB_DATA_TYPE_NCHAR) { + res->node.resType.bytes *= TSDB_NCHAR_SIZE; + } int32_t type = output.columnData->info.type; if (IS_VAR_DATA_TYPE(type)) { res->datum.p = output.columnData->pData; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e7ff0bde91..995d38e9b7 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -646,13 +646,13 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int16_t inputType = pInput[0].columnData->info.type; int16_t outputType = pOutput[0].columnData->info.type; - if (outputType != TSDB_DATA_TYPE_BIGINT && outputType != TSDB_DATA_TYPE_UBIGINT && - outputType != TSDB_DATA_TYPE_VARCHAR && outputType != TSDB_DATA_TYPE_NCHAR && - outputType != TSDB_DATA_TYPE_TIMESTAMP) { - return TSDB_CODE_FAILED; - } int64_t outputLen = pOutput[0].columnData->info.bytes; + if (IS_VAR_DATA_TYPE(outputType)) { + int32_t factor = (TSDB_DATA_TYPE_NCHAR == outputType) ? TSDB_NCHAR_SIZE : 1; + outputLen = outputLen * factor + VARSTR_HEADER_SIZE; + } + char *input = NULL; char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; @@ -723,7 +723,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } else if (inputType == TSDB_DATA_TYPE_BINARY) { int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), varDataVal(input)); varDataSetLen(output, len); - } else if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) { + } else if (inputType == TSDB_DATA_TYPE_NCHAR) { //not support return TSDB_CODE_FAILED; } else { From 482afcf2880413d903495080ceb43564aef9feda Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 19 Apr 2022 18:16:29 +0800 Subject: [PATCH 05/12] refactor(query): refactor cast function TD-14837 TD-14843 TD-14904 --- source/libs/scalar/src/scalar.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index e6de165ab5..0432ae1df8 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -444,9 +444,6 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->node.resType.type = TSDB_DATA_TYPE_NULL; } else { res->node.resType = node->node.resType; - if (res->node.resType.type == TSDB_DATA_TYPE_NCHAR) { - res->node.resType.bytes *= TSDB_NCHAR_SIZE; - } int32_t type = output.columnData->info.type; if (IS_VAR_DATA_TYPE(type)) { res->datum.p = output.columnData->pData; From f9947be46f603b323763c815bc445cb33e02146c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 19 Apr 2022 18:16:29 +0800 Subject: [PATCH 06/12] refactor(query): refactor cast function TD-14837 TD-14843 TD-14904 --- source/libs/function/src/builtins.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5ba0cee53e..4403ff1540 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -32,7 +32,7 @@ static int32_t invaildFuncParaNumErrMsg(char* pErrBuf, int32_t len, const char* } static int32_t invaildFuncParaTypeErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Inconsistent datatypes : %s", pFuncName); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid datatypes : %s", pFuncName); } static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { @@ -327,7 +327,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } int32_t para2Bytes = pFunc->node.resType.bytes; - if (para2Bytes < 0) { //negative value or overflow + if (para2Bytes <= 0) { //non-positive value or overflow return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); } return TSDB_CODE_SUCCESS; From f25a72e5c3402162de0f0990b2ef0454a295d25a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Apr 2022 18:45:28 +0800 Subject: [PATCH 07/12] fix(query): fix the crash caused by the complex having clause in which an scalar function is nested with an aggregate function in group by query. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 20 ++++++++++++++------ source/libs/executor/src/groupoperator.c | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 230ccf6ace..4c9ed78769 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -490,6 +490,7 @@ typedef struct SGroupbyOperatorInfo { SExprInfo* pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression in group operator SqlFunctionCtx* pScalarFuncCtx; + int32_t* rowCellInfoOffset; // offset value for each row result cell info } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 87ecdde3ab..cd129b54fb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1173,6 +1173,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; + // if the source equals to the destination, it is to create a new column as the result of scalar function or some operators. + bool createNewColModel = (pResult == pSrcBlock); + int32_t numOfRows = 0; for (int32_t k = 0; k < numOfOutput; ++k) { @@ -1181,7 +1184,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - if (pResult->info.rows > 0) { + if (pResult->info.rows > 0 && !createNewColModel) { colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); } else { colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); @@ -1191,7 +1194,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t offset = pResult->info.rows; + int32_t offset = createNewColModel? 0: pResult->info.rows; for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } @@ -1207,7 +1210,8 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* SScalarParam dest = {.columnData = &idata}; scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + int32_t startOffset = createNewColModel? 0:pResult->info.rows; + colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1224,7 +1228,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pfCtx->fpSet.init(&pCtx[k], pResInfo); pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - pfCtx->offset = pResult->info.rows; // set the start offset + pfCtx->offset = createNewColModel? 0:pResult->info.rows; // set the start offset // set the timestamp(_rowts) output buffer if (taosArrayGetSize(pPseudoList) > 0) { @@ -1242,7 +1246,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* SScalarParam dest = {.columnData = &idata}; scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + int32_t startOffset = createNewColModel? 0:pResult->info.rows; + colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1252,7 +1258,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* } } - pResult->info.rows += numOfRows; + if (!createNewColModel) { + pResult->info.rows += numOfRows; + } } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8018a8dd31..d610880e30 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -341,7 +341,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->numOfScalarExpr = numOfScalarExpr; - pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { From 6d8c06efd50d966374c29fb4e54035ad2f80d29c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 19 Apr 2022 19:16:15 +0800 Subject: [PATCH 08/12] fix(query): fix timezone function with trailing characters --- source/libs/scalar/src/sclfunc.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e7ff0bde91..db1808b767 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1232,6 +1232,7 @@ int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p if (inputNum != 1) { return TSDB_CODE_FAILED; } + pOutput->columnData->info.bytes = pInput->columnData->info.bytes; colDataAppend(pOutput->columnData, pOutput->numOfRows, (char *)colDataGetData(pInput->columnData, 0), false); return TSDB_CODE_SUCCESS; } From 43aecff7ea3c6c0c87937a2c61fd1dd0775618b5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 19 Apr 2022 19:16:15 +0800 Subject: [PATCH 09/12] fix(query): fix timezone function with trailing characters --- source/libs/function/src/builtins.c | 5 +++-- source/libs/scalar/src/sclfunc.c | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 78dec26be5..726f9d0993 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -139,8 +139,9 @@ static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, in } static int32_t translateTimezone(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - // pseudo column do not need to check parameters - pFunc->node.resType = (SDataType){.bytes = TD_TIMEZONE_LEN, .type = TSDB_DATA_TYPE_BINARY}; + SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + pFunc->node.resType = (SDataType){.bytes = pPara1->resType.bytes, .type = pPara1->resType.type}; + //pFunc->node.resType = (SDataType){.bytes = TD_TIMEZONE_LEN, .type = TSDB_DATA_TYPE_BINARY}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index db1808b767..e7ff0bde91 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1232,7 +1232,6 @@ int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p if (inputNum != 1) { return TSDB_CODE_FAILED; } - pOutput->columnData->info.bytes = pInput->columnData->info.bytes; colDataAppend(pOutput->columnData, pOutput->numOfRows, (char *)colDataGetData(pInput->columnData, 0), false); return TSDB_CODE_SUCCESS; } From f53db3d3b97d7ca5ad9e00124574d795c5a98cc3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 19 Apr 2022 20:40:45 +0800 Subject: [PATCH 10/12] fix(query): fix cast function NULL value handling logic --- source/libs/scalar/src/sclfunc.c | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 995d38e9b7..baa99b7c1e 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -653,20 +653,15 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp outputLen = outputLen * factor + VARSTR_HEADER_SIZE; } - char *input = NULL; char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; - if (IS_VAR_DATA_TYPE(inputType)) { - input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0]; - } else { - input = pInput[0].columnData->pData; - } for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { colDataAppendNULL(pOutput->columnData, i); continue; } + char *input = colDataGetData(pInput[0].columnData, i); switch(outputType) { case TSDB_DATA_TYPE_BIGINT: { @@ -776,11 +771,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } colDataAppend(pOutput->columnData, i, output, false); - if (IS_VAR_DATA_TYPE(inputType)) { - input += varDataTLen(input); - } else { - input += tDataTypes[inputType].bytes; - } if (IS_VAR_DATA_TYPE(outputType)) { output += varDataTLen(output); } else { From a6e69b9704f40bfc5af05ee278373503b5c9dfe4 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 19 Apr 2022 21:04:02 +0800 Subject: [PATCH 11/12] trow: SKVRow read logic --- include/common/trow.h | 28 ++++++++++++++++++++++++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 17 +++++++++++----- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/include/common/trow.h b/include/common/trow.h index 963542fb31..862df43fc2 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -651,6 +651,8 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { TD_ROW_SET_INFO(pBuilder->pBuf, 0); TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType); + TASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0); + uint32_t len = 0; switch (pBuilder->rowType) { case TD_ROW_TP: @@ -1165,6 +1167,18 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in return TSDB_CODE_SUCCESS; } +/** + * @brief + * + * @param pRow + * @param colId + * @param colType + * @param flen + * @param offset + * @param colIdx start from 0 + * @param pVal + * @return FORCE_INLINE + */ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1172,10 +1186,20 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t return true; } void *pBitmap = tdGetBitmapAddrTp(pRow, flen); - tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx - 1); + tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx); return true; } +/** + * @brief + * + * @param pRow + * @param colId + * @param offset + * @param colIdx start from 0 + * @param pVal + * @return FORCE_INLINE + */ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1183,7 +1207,7 @@ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t o return true; } void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow)); - tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx - 1); + tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx); return true; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2ad34d1561..4686d97ec8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1561,12 +1561,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit if (isChosenRowDataRow) { colId = pSchema->columns[chosen_itr].colId; offset = pSchema->columns[chosen_itr].offset; - tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal); + // TODO: use STSRowIter + tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal); } else { - SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr); - colId = pColIdx->colId; - offset = pColIdx->offset; - tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal); + // TODO: use STSRowIter + if (chosen_itr == 0) { + colId = PRIMARYKEY_TIMESTAMP_COL_ID; + tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal); + } else { + SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1); + colId = pColIdx->colId; + offset = pColIdx->offset; + tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal); + } } if (colId == pColInfo->info.colId) { From f0c0bc923c0fbffbe1861168b65a10593a5e4e43 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 19 Apr 2022 23:57:07 +0800 Subject: [PATCH 12/12] enh(rpc): fix mem leak --- source/libs/transport/src/transCli.c | 43 ++++++++---- source/libs/transport/src/transSrv.c | 97 ++++++++++++---------------- 2 files changed, 73 insertions(+), 67 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9dc12a3dec..8eb1a3ee7d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -129,6 +129,15 @@ static void transDestroyConnCtx(STransConnCtx* ctx); static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); +static void cliWalkCb(uv_handle_t* handle, void* arg); + +#define CLI_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, cliWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + // snprintf may cause performance problem #define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ do { \ @@ -212,8 +221,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_NO_PERSIST_BY_APP(conn) \ + (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_RELEASE_BY_SERVER(conn) \ + (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) @@ -288,8 +299,9 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), - taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, + TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; @@ -355,10 +367,12 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, + TMSG_INFO(transMsg.msgType)); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); + tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, + transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -631,8 +645,9 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -671,9 +686,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { destroyCmsg(pMsg); destroyConnPool(pThrd->pool); uv_timer_stop(&pThrd->timer); + uv_walk(pThrd->loop, cliWalkCb, NULL); + pThrd->quit = true; - uv_stop(pThrd->loop); + // uv_stop(pThrd->loop); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; @@ -786,7 +803,6 @@ static void* cliWorkThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); - return NULL; } @@ -851,8 +867,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } - uv_stop(pThrd->loop); taosThreadJoin(pThrd->thread, NULL); + CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); @@ -874,6 +890,11 @@ void cliSendQuit(SCliThrdObj* thrd) { msg->type = Quit; transSendAsync(thrd->asyncPool, &msg->q); } +void cliWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} int cliRBChoseIdx(STrans* pTransInst) { int64_t index = pTransInst->index; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 2d37dfa37c..158d599bdf 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,27 +93,6 @@ typedef struct SServerObj { static const char* notify = "a"; -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ - } while (0) - static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); @@ -125,11 +104,8 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); - -static void uvFreeCb(uv_handle_t* handle) { - // - taosMemoryFree(handle); -} +static void uvWalkCb(uv_handle_t* handle, void* arg); +static void uvFreeCb(uv_handle_t* handle); static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); @@ -159,6 +135,34 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(void* arg); static bool addHandleToAcceptloop(void* arg); +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ + } while (0) + +#define SRV_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, uvWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSrvConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -370,7 +374,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { uvPrepareSendData(smsg, &wb); SSrvConn* pConn = smsg->pConn; - uv_timer_stop(&pConn->pTimer); + // uv_timer_stop(&pConn->pTimer); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSrvMsg* smsg) { @@ -439,36 +443,17 @@ void uvWorkerAsyncCb(uv_async_t* handle) { static void uvWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { uv_close(handle, NULL); - // uv_unref(handle); - tDebug("handle: %p -----test----", handle); } } -#define MAKE_VALGRIND_HAPPY(loop) \ - do { \ - uv_walk(loop, uvWalkCb, NULL); \ - uv_run(loop, UV_RUN_DEFAULT); \ - uv_loop_close(loop); \ - } while (0); +static void uvFreeCb(uv_handle_t* handle) { + // + taosMemoryFree(handle); +} static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; tDebug("close server port %d", srv->port); uv_walk(srv->loop, uvWalkCb, NULL); - // uv_close((uv_handle_t*)async, NULL); - // uv_close((uv_handle_t*)&srv->server, NULL); - // uv_stop(srv->loop); - // uv_print_all_handles(srv->loop, stderr); - // int ref = uv_loop_alive(srv->loop); - // assert(ref == 0); - // tError("active size %d", ref); - // uv_stop(srv->loop); - // uv_run(srv->loop, UV_RUN_DEFAULT); - // fprintf(stderr, "------------------------------------"); - // uv_print_all_handles(srv->loop, stderr); - - // int ret = uv_loop_close(srv->loop); - // tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret); - // assert(ret == 0); } static void uvShutDownCb(uv_shutdown_t* req, int status) { @@ -535,8 +520,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ - uv_timer_init(pThrd->loop, &pConn->pTimer); - pConn->pTimer.data = pConn; + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; pConn->hostThrd = pThrd; @@ -680,11 +665,11 @@ static void uvDestroyConn(uv_handle_t* handle) { SWorkThrdObj* thrd = conn->hostThrd; tDebug("server conn %p destroy", conn); - uv_timer_stop(&conn->pTimer); + // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); - // taosMemoryFree(conn); + taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); @@ -805,7 +790,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } taosThreadJoin(pThrd->thread, NULL); - MAKE_VALGRIND_HAPPY(pThrd->loop); + SRV_RELEASE_UV(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); @@ -825,7 +810,7 @@ void transCloseServer(void* arg) { uv_async_send(srv->pAcceptAsync); taosThreadJoin(srv->thread, NULL); - MAKE_VALGRIND_HAPPY(srv->loop); + SRV_RELEASE_UV(srv->loop); for (int i = 0; i < srv->numOfThreads; i++) { sendQuitToWorkThrd(srv->pThreadObj[i]);