From ce77983ff256d958726d47582b5fbc69075fe369 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 May 2022 11:48:40 +0800 Subject: [PATCH 1/6] fix(rpc): fix invalide fqdn --- source/os/src/osDir.c | 50 +++++++++++++-------------- source/os/src/osSocket.c | 74 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 92 insertions(+), 32 deletions(-) diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 19e4defafc..abdbf07c1a 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -26,7 +26,6 @@ typedef struct TdDirEntry { WIN32_FIND_DATA findFileData; } TdDirEntry; - typedef struct TdDir { TdDirEntry dirEntry; HANDLE hFind; @@ -59,7 +58,7 @@ void wordfree(wordexp_t *pwordexp) {} #include typedef struct dirent dirent; -typedef struct DIR TdDir; +typedef struct DIR TdDir; typedef struct dirent TdDirEntry; #endif @@ -78,14 +77,14 @@ void taosRemoveDir(const char *dirname) { taosRemoveDir(filename); } else { (void)taosRemoveFile(filename); - //printf("file:%s is removed\n", filename); + // printf("file:%s is removed\n", filename); } } taosCloseDir(&pDir); rmdir(dirname); - //printf("dir:%s is removed\n", dirname); + // printf("dir:%s is removed\n", dirname); return; } @@ -102,8 +101,8 @@ int32_t taosMkDir(const char *dirname) { int32_t taosMulMkDir(const char *dirname) { if (dirname == NULL) return -1; - char *temp = strdup(dirname); - char *pos = temp; + char * temp = strdup(dirname); + char * pos = temp; int32_t code = 0; if (strncmp(temp, "/", 1) == 0) { @@ -111,8 +110,8 @@ int32_t taosMulMkDir(const char *dirname) { } else if (strncmp(temp, "./", 2) == 0) { pos += 2; } - - for ( ; *pos != '\0'; pos++) { + + for (; *pos != '\0'; pos++) { if (*pos == '/') { *pos = '\0'; code = mkdir(temp, 0755); @@ -123,7 +122,7 @@ int32_t taosMulMkDir(const char *dirname) { *pos = '/'; } } - + if (*(pos - 1) != '/') { code = mkdir(temp, 0755); if (code < 0 && errno != EEXIST) { @@ -145,7 +144,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { TdDirPtr pDir = taosOpenDir(dirname); if (pDir == NULL) return; - int64_t sec = taosGetTimestampSec(); + int64_t sec = taosGetTimestampSec(); TdDirEntryPtr de = NULL; while ((de = taosReadDir(pDir)) != NULL) { @@ -173,9 +172,9 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { int32_t days = (int32_t)(TABS(sec - fileSec) / 86400 + 1); if (days > keepDays) { (void)taosRemoveFile(filename); - //printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays); + // printf("file:%s is removed, days:%d keepDays:%d", filename, days, keepDays); } else { - //printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays); + // printf("file:%s won't be removed, days:%d keepDays:%d", filename, days, keepDays); } } } @@ -187,7 +186,7 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays) { int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) { wordexp_t full_path; if (0 != wordexp(dirname, &full_path, 0)) { - //printf("failed to expand path:%s since %s", dirname, strerror(errno)); + // printf("failed to expand path:%s since %s", dirname, strerror(errno)); wordfree(&full_path); return -1; } @@ -228,9 +227,9 @@ bool taosIsDir(const char *dirname) { return false; } -char* taosDirName(char *name) { +char *taosDirName(char *name) { #ifdef WINDOWS - char Drive1[MAX_PATH], Dir1[MAX_PATH]; + char Drive1[MAX_PATH], Dir1[MAX_PATH]; _splitpath(name, Drive1, Dir1, NULL, NULL); size_t dirNameLen = strlen(Drive1) + strlen(Dir1); if (dirNameLen > 0) { @@ -242,13 +241,13 @@ char* taosDirName(char *name) { #endif } -char* taosDirEntryBaseName(char *name) { +char *taosDirEntryBaseName(char *name) { #ifdef WINDOWS char Filename1[MAX_PATH], Ext1[MAX_PATH]; _splitpath(name, NULL, NULL, Filename1, Ext1); return name + (strlen(name) - strlen(Filename1) - strlen(Ext1)); #else - return (char*)basename(name); + return (char *)basename(name); #endif } @@ -258,8 +257,8 @@ TdDirPtr taosOpenDir(const char *dirname) { } #ifdef WINDOWS - char szFind[MAX_PATH]; //这是要找的 - HANDLE hFind; + char szFind[MAX_PATH]; //这是要找的 + HANDLE hFind; TdDirPtr pDir = taosMemoryMalloc(sizeof(TdDir)); @@ -275,7 +274,6 @@ TdDirPtr taosOpenDir(const char *dirname) { #else return (TdDirPtr)opendir(dirname); #endif - } TdDirEntryPtr taosReadDir(TdDirPtr pDir) { @@ -286,9 +284,9 @@ TdDirEntryPtr taosReadDir(TdDirPtr pDir) { if (!FindNextFile(pDir->hFind, &(pDir->dirEntry.findFileData))) { return NULL; } - return (TdDirEntryPtr)&(pDir->dirEntry.findFileData); + return (TdDirEntryPtr) & (pDir->dirEntry.findFileData); #else - return (TdDirEntryPtr)readdir((DIR*)pDir); + return (TdDirEntryPtr)readdir((DIR *)pDir); #endif } @@ -299,18 +297,18 @@ bool taosDirEntryIsDir(TdDirEntryPtr pDirEntry) { #ifdef WINDOWS return (pDirEntry->findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0; #else - return (((dirent*)pDirEntry)->d_type & DT_DIR) != 0; + return (((dirent *)pDirEntry)->d_type & DT_DIR) != 0; #endif } -char* taosGetDirEntryName(TdDirEntryPtr pDirEntry) { +char *taosGetDirEntryName(TdDirEntryPtr pDirEntry) { if (pDirEntry == NULL) { return NULL; } #ifdef WINDOWS return pDirEntry->findFileData.cFileName; #else - return ((dirent*)pDirEntry)->d_name; + return ((dirent *)pDirEntry)->d_name; #endif } @@ -324,7 +322,7 @@ int32_t taosCloseDir(TdDirPtr *ppDir) { *ppDir = NULL; return 0; #else - closedir((DIR*)*ppDir); + closedir((DIR *)*ppDir); *ppDir = NULL; return 0; #endif diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index f3da490f36..c2be74a0a7 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -28,6 +28,7 @@ #else #include #include +#include #include #include #include @@ -638,6 +639,72 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { return 0; } +int taosGetLocalIp(const char *eth, char *ip) { +#if defined(WINDOWS) + // DO NOTHAING + return 0; +#else + int fd; + struct ifreq ifr; + struct sockaddr_in sin; + + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (-1 == fd) { + return -1; + } + strncpy(ifr.ifr_name, eth, IFNAMSIZ); + ifr.ifr_name[IFNAMSIZ - 1] = 0; + + if (ioctl(fd, SIOCGIFADDR, &ifr) < 0) { + taosCloseSocketNoCheck1(fd); + return -1; + } + memcpy(&sin, &ifr.ifr_addr, sizeof(sin)); + snprintf(ip, 64, "%s", inet_ntoa(sin.sin_addr)); + taosCloseSocketNoCheck1(fd); +#endif + return 0; +} +int taosValidIp(uint32_t ip) { +#if defined(WINDOWS) + // DO NOTHAING + return 0; +#else + int ret = -1; + int fd; + + struct ifconf ifconf; + + char buf[512] = {0}; + ifconf.ifc_len = 512; + ifconf.ifc_buf = buf; + + if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + return -1; + } + + ioctl(fd, SIOCGIFCONF, &ifconf); + struct ifreq *ifreq = (struct ifreq *)ifconf.ifc_buf; + for (int i = (ifconf.ifc_len / sizeof(struct ifreq)); i > 0; i--) { + char ip_str[64] = {0}; + if (ifreq->ifr_flags == AF_INET) { + ret = taosGetLocalIp(ifreq->ifr_name, ip_str); + if (ret != 0) { + break; + } + ifreq++; + } + if (ip == (uint32_t)taosInetAddr(ip_str)) { + ret = 0; + break; + } + } + taosCloseSocketNoCheck1(fd); + return ret; +#endif + return 0; +} + bool taosValidIpAndPort(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; SocketFd fd; @@ -677,13 +744,8 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) { taosCloseSocket(&pSocket); return false; } - if (listen(pSocket->fd, 1024) < 0) { - // printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(&pSocket); - return false; - } taosCloseSocket(&pSocket); - return true; + return 0 == taosValidIp(ip) ? true : false; } TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; From 97f00c0e8a38da7cf0e47be81dc2106b03291228 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 May 2022 12:08:51 +0800 Subject: [PATCH 2/6] fix(rpc): fix duplicat port error --- source/os/src/osSocket.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index c2be74a0a7..9e29f44ed3 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -692,12 +692,13 @@ int taosValidIp(uint32_t ip) { if (ret != 0) { break; } + ret = -1; + if (ip == (uint32_t)taosInetAddr(ip_str)) { + ret = 0; + break; + } ifreq++; } - if (ip == (uint32_t)taosInetAddr(ip_str)) { - ret = 0; - break; - } } taosCloseSocketNoCheck1(fd); return ret; From e000addc2c63b89d95c0c0816148a2080c9510e8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 9 May 2022 13:34:26 +0800 Subject: [PATCH 3/6] fix(query): fix cast function regression brought by other changes --- source/libs/parser/src/parAstCreater.c | 6 ++---- source/libs/scalar/src/sclfunc.c | 4 ++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e8ac562072..04ef854b5c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -363,10 +363,8 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; - if (TSDB_DATA_TYPE_VARCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes + VARSTR_HEADER_SIZE; - } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; + if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE; } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5fa8eb0c9a..5127ae534a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,6 +709,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); + if (IS_VAR_DATA_TYPE(outputType)) { + outputLen += VARSTR_HEADER_SIZE; + } + char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; From b4c7118a51b296a65e5fc1d2c6fd10671de00fef Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 9 May 2022 15:13:43 +0800 Subject: [PATCH 4/6] fix(query): fix cast ncahr bug --- source/libs/scalar/src/sclfunc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5fa8eb0c9a..0161323e37 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -822,7 +822,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp varDataSetLen(output, len); } //for constant conversion, need to set proper length of pOutput description - if (len < outputLen - VARSTR_HEADER_SIZE) { + if (len < outputLen) { pOutput->columnData->info.bytes = len; } break; From b3df34ef08e33cf7daf12879e2c48b5d3e5eec3b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 9 May 2022 15:16:47 +0800 Subject: [PATCH 5/6] Revert "fix(query): fix cast function regression brought by other changes" This reverts commit e000addc2c63b89d95c0c0816148a2080c9510e8. --- source/libs/parser/src/parAstCreater.c | 6 ++++-- source/libs/scalar/src/sclfunc.c | 4 ---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 04ef854b5c..e8ac562072 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -363,8 +363,10 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; - if (TSDB_DATA_TYPE_NCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE; + if (TSDB_DATA_TYPE_VARCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes + VARSTR_HEADER_SIZE; + } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index a727d80170..0161323e37 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,10 +709,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); - if (IS_VAR_DATA_TYPE(outputType)) { - outputLen += VARSTR_HEADER_SIZE; - } - char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; From 39bf90c0daa23ff6aba14dc71330c981ff48aaa9 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 9 May 2022 15:17:33 +0800 Subject: [PATCH 6/6] fix(sync): fix memory leak --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 6 ++++++ source/libs/sync/src/syncCommit.c | 2 ++ 2 files changed, 8 insertions(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 11afe33335..2baa8b8942 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -198,6 +198,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.refId = pMsg->rpcMsg.refId; tmsgSendRsp(&rsp); } + + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } } @@ -211,6 +214,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 8b54b0a090..5febb9a14c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -72,6 +72,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { "pSyncNode->pRaftStore->currentTerm:%lu", pEntry->term, pSyncNode->pRaftStore->currentTerm); } + + syncEntryDestory(pEntry); } }