From a91f0736fa7d2be3123652d49e326ae2063098de Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 5 Apr 2020 22:05:28 +0800 Subject: [PATCH 1/2] [TD-98] update the hash functions for rpc module --- src/rpc/src/rpcMain.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 8dbbb97a1a..64f265ce6e 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -24,7 +24,6 @@ #include "lz4.h" #include "taoserror.h" #include "tsocket.h" -#include "shash.h" #include "taosmsg.h" #include "rpcUdp.h" #include "rpcCache.h" @@ -263,7 +262,6 @@ void *rpcOpen(SRpcInit *pInit) { } if (pRpc->connType == TAOS_CONN_SERVER) { -// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); @@ -298,7 +296,6 @@ void rpcClose(void *param) { } } -// taosCleanUpStrHash(pRpc->hash); taosHashCleanup(pRpc->hash); taosTmrCleanUp(pRpc->tmrCtrl); taosIdPoolCleanUp(pRpc->idPool); @@ -535,8 +532,7 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); -// taosDeleteStrHash(pRpc->hash, hashstr); -// taosHashRemove(pRpc->hash, hashstr, size); + taosHashRemove(pRpc->hash, hashstr, size); rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; @@ -588,7 +584,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated -// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; if (pConn) return pConn; @@ -621,7 +616,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->localPort = (pRpc->localPort + pRpc->index); } -// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", From fd7220693f4d6c3f9703fbbfb50d202f9449d162 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 5 Apr 2020 22:45:37 +0800 Subject: [PATCH 2/2] [TD-98] fix insertion caused client crash, and memory leak during insertion. --- src/client/src/tscSubquery.c | 3 +++ src/client/src/tscUtil.c | 30 ++++++++++++++++-------------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 039cc966f0..b93ec0b6d3 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1512,6 +1512,9 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + tfree(pState); + tfree(pSupporter); + // release data block data pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0294120ccb..61b7ab4ec5 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -587,7 +587,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { assert(pCmd->numOfClause == 1); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache + // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) { strcpy(pTableMetaInfo->name, pDataBlock->tableId); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); @@ -599,7 +599,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { /* * the submit message consists of : [RPC header|message body|digest] - * the dataBlock only includes the RPC Header buffer and actual submit messsage body, space for digest needs + * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs * additional space. */ int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100); @@ -1277,7 +1277,7 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) { void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) { - if (src == NULL) { + if (src == NULL || src->numOfExprs == 0) { return; } @@ -1983,22 +1983,24 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void return NULL; } - memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd)); + SSqlCmd* pnCmd = &pNew->cmd; + memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); + + pnCmd->command = cmd; + pnCmd->payload = NULL; + pnCmd->allocSize = 0; - pNew->cmd.command = cmd; - pNew->cmd.payload = NULL; - pNew->cmd.allocSize = 0; + pnCmd->pQueryInfo = NULL; + pnCmd->numOfClause = 0; + pnCmd->clauseIndex = 0; + pnCmd->pDataBlocks = NULL; - pNew->cmd.pQueryInfo = NULL; - pNew->cmd.numOfClause = 0; - pNew->cmd.clauseIndex = 0; - - if (tscAddSubqueryInfo(&pNew->cmd) != TSDB_CODE_SUCCESS) { + if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pNew); return NULL; } - SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo)); @@ -2018,7 +2020,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); } - if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { + if (tscAllocPayload(pnCmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pTableMetaInfo->vnodeIndex); tscFreeSqlObj(pNew); return NULL;