From b120f8ce71bf0134fa3069d60414c51f1943908e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Oct 2020 13:21:28 +0800 Subject: [PATCH 01/18] [td-1655] --- src/client/src/tscProfile.c | 1 + src/inc/taosdef.h | 2 +- src/inc/taosmsg.h | 1 + src/mnode/src/mnodeProfile.c | 36 +++++++++++++++++++++--------------- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index f2f997a594..bcd52d3a42 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -242,6 +242,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pQdesc->stime = htobe64(pSql->stime); pQdesc->queryId = htonl(pSql->queryId); pQdesc->useconds = htobe64(pSql->res.useconds); + pQdesc->qHandle = htobe64(pSql->res.qhandle); pHeartbeat->numOfQueries++; pQdesc++; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 25814a748e..0ece9c9862 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -282,7 +282,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_SID_MASK 0xFF #define TSDB_HTTP_TOKEN_LEN 20 -#define TSDB_SHOW_SQL_LEN 64 +#define TSDB_SHOW_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_MQTT_HOSTNAME_LEN 64 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 50b31a86cc..9127158473 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -740,6 +740,7 @@ typedef struct { uint32_t queryId; int64_t useconds; int64_t stime; + uint64_t qHandle; } SQueryDesc; typedef struct { diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 26f38dde03..8e0dc2f182 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -24,15 +24,9 @@ #include "mnode.h" #include "mnodeDef.h" #include "mnodeInt.h" -#include "mnodeAcct.h" -#include "mnodeDnode.h" -#include "mnodeDb.h" -#include "mnodeMnode.h" #include "mnodeProfile.h" #include "mnodeShow.h" -#include "mnodeTable.h" #include "mnodeUser.h" -#include "mnodeVgroup.h" #include "mnodeWrite.h" #define CONN_KEEP_TIME (tsShellActivityTimer * 3) @@ -299,7 +293,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "queryId"); + strcpy(pSchema[cols].name, "query_id"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -315,9 +309,15 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 24; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "qhandle"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created time"); + strcpy(pSchema[cols].name, "created_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -352,7 +352,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v SConnObj *pConnObj = NULL; int32_t cols = 0; char * pWrite; - char ipStr[TSDB_IPv4ADDR_LEN + 6]; + char str[TSDB_IPv4ADDR_LEN + 6] = {0}; while (numOfRows < rows) { pShow->pIter = mnodeGetNextConn(pShow->pIter, &pConnObj); @@ -362,9 +362,9 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v SQueryDesc *pDesc = pConnObj->pQueries + i; cols = 0; - snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); + snprintf(str, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -372,8 +372,15 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); + cols++; + + char handleBuf[24] = {0}; + snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qHandle)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -393,8 +400,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 6; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, cols, numOfRows, rows, pShow); return numOfRows; } From acdc22ee11ad22e9562fd6069b72e2033ff696bc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Oct 2020 13:44:16 +0800 Subject: [PATCH 02/18] [td-1613] --- src/mnode/src/mnodeTable.c | 56 ++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 1bc328800e..850aebd186 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -63,27 +63,27 @@ static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg); static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg); -static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg); static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg); -static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg); -static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *mnodeMsg); -static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg); -static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg); static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg); static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg); static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg); -static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *mnodeMsg); +static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg); static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg); static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); @@ -2543,6 +2543,25 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 8; // table uid + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "uid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "vgId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); pShow->numOfColumns = cols; @@ -2568,6 +2587,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows return 0; } + int32_t cols = 0; int32_t numOfRows = 0; SChildTableObj *pTable = NULL; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; @@ -2608,8 +2628,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows continue; } - int32_t cols = 0; - + cols = 0; char *pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_WITH_MAXSIZE_TO_VARSTR(pWrite, tableName, pShow->bytes[cols]); @@ -2638,14 +2657,29 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows cols++; + // uid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t*) pWrite = pTable->uid; + cols++; + + + // tid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*) pWrite = pTable->sid; + cols++; + + //vgid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*) pWrite = pTable->vgId; + cols++; + numOfRows++; mnodeDecTableRef(pTable); } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 4; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, cols, numOfRows, rows, pShow); mnodeDecDbRef(pDb); free(pattern); From 0060a73e94b29a0a34ae890f9c7b4e283cb8f8cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Oct 2020 17:59:34 +0800 Subject: [PATCH 03/18] [td-1613] --- src/mnode/src/mnodeDb.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 8558350950..a06152080e 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -760,6 +760,8 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void } pShow->numOfReads += numOfRows; + mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeDecUserRef(pUser); return numOfRows; } From f3250e62bd572a15318453e599d7287b872792e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 09:45:18 +0800 Subject: [PATCH 04/18] [td-1660] --- src/client/src/tscServer.c | 8 +++++++ src/inc/taosdef.h | 6 +++-- src/inc/taosmsg.h | 4 ++++ src/mnode/inc/mnodeProfile.h | 4 +++- src/mnode/src/mnodeProfile.c | 40 ++++++++++++++++++++++++++++------ src/mnode/src/mnodeShow.c | 8 ++++--- src/os/inc/osSemphone.h | 2 ++ src/os/src/detail/osSemphone.c | 26 ++++++++++++++++++++++ 8 files changed, 85 insertions(+), 13 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d3c0201169..de0c8d127d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1494,6 +1494,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; + // TODO refactor full_name char *db; // ugly code to move the space db = strstr(pObj->db, TS_PATH_DELIMITER); db = (db == NULL) ? pObj->db : db + 1; @@ -1501,6 +1502,9 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tstrncpy(pConnect->clientVersion, version, sizeof(pConnect->clientVersion)); tstrncpy(pConnect->msgVersion, "", sizeof(pConnect->msgVersion)); + pConnect->pid = htonl(taosGetPId()); + taosGetCurrentAPPName(pConnect->appName, NULL); + return TSDB_CODE_SUCCESS; } @@ -1653,6 +1657,10 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload; pHeartbeat->numOfQueries = numOfQueries; pHeartbeat->numOfStreams = numOfStreams; + + pHeartbeat->pid = htonl(taosGetPId()); + taosGetCurrentAPPName(pHeartbeat->appName, NULL); + int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj); pthread_mutex_unlock(&pObj->mutex); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 0ece9c9862..01a4ed32f1 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -253,8 +253,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE -#define TSDB_MAX_SQL_SHOW_LEN 256 -#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024U) // sql length should be less than 8mb +#define TSDB_MAX_SQL_SHOW_LEN 512 +#define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 8mb + +#define TSDB_APPNAME_LEN TSDB_UNI_LEN #define TSDB_MAX_BYTES_PER_ROW 16384 #define TSDB_MAX_TAGS_LEN 16384 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9127158473..1f1f9690e2 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -305,6 +305,8 @@ typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; char db[TSDB_TABLE_FNAME_LEN]; + char appName[TSDB_APPNAME_LEN]; + int32_t pid; } SCMConnectMsg; typedef struct { @@ -756,8 +758,10 @@ typedef struct { typedef struct { uint32_t connId; + int32_t pid; int32_t numOfQueries; int32_t numOfStreams; + char appName[TSDB_APPNAME_LEN]; char pData[]; } SCMHeartBeatMsg; diff --git a/src/mnode/inc/mnodeProfile.h b/src/mnode/inc/mnodeProfile.h index e39496ec9c..1e5b1c0f9c 100644 --- a/src/mnode/inc/mnodeProfile.h +++ b/src/mnode/inc/mnodeProfile.h @@ -23,6 +23,8 @@ extern "C" { typedef struct { char user[TSDB_USER_LEN]; + char appName[TSDB_APPNAME_LEN]; // app name that invokes taosc + uint32_t pid; // pid of app that invokes taosc int8_t killed; uint16_t port; uint32_t ip; @@ -40,7 +42,7 @@ typedef struct { int32_t mnodeInitProfile(); void mnodeCleanupProfile(); -SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port); +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app); SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port); void mnodeReleaseConn(SConnObj *pConn); int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg); diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 8e0dc2f182..25ca526d7e 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -72,7 +72,7 @@ void mnodeCleanupProfile() { } } -SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { +SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app) { #if 0 int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable); if (connSize > tsMaxShellConns) { @@ -90,10 +90,13 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { .ip = ip, .port = port, .connId = connId, - .stime = taosGetTimestampMs() + .stime = taosGetTimestampMs(), + .pid = pid, }; - tstrncpy(connObj.user, user, sizeof(connObj.user)); + tstrncpy(connObj.user, user, tListLen(connObj.user)); + tstrncpy(connObj.appName, app, tListLen(connObj.appName)); + connObj.lastAccess = connObj.stime; SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); @@ -177,6 +180,20 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + // app name + pShow->bytes[cols] = TSDB_APPNAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "app_name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + // app pid + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "pid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "ip:port"); @@ -185,13 +202,13 @@ static int32_t mnodeGetConnsMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); + strcpy(pSchema[cols].name, "login_time"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; pShow->bytes[cols] = 8; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "last access"); + strcpy(pSchema[cols].name, "last_access"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -230,6 +247,16 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); cols++; + // app name + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->appName, pShow->bytes[cols]); + cols++; + + // app pid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t*)pWrite = pConnObj->pid; + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); @@ -248,8 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 5; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, cols, numOfRows, rows, pShow); return numOfRows; } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index e587758e46..80909e99ae 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -186,7 +186,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { rowsToRead = pShow->numOfRows - pShow->numOfReads; } - /* return no more than 100 meters in one round trip */ + /* return no more than 100 tables in one round trip */ if (rowsToRead > 100) rowsToRead = 100; /* @@ -244,7 +244,8 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { int32_t connId = htonl(pHBMsg->connId); SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); if (pConn == NULL) { - pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + pHBMsg->pid = htonl(pHBMsg->pid); + pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName); } if (pConn == NULL) { @@ -325,7 +326,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { goto connect_over; } - SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort); + pConnectMsg->pid = htonl(pConnectMsg->pid); + SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName); if (pConn == NULL) { code = terrno; } else { diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemphone.h index 4280b458a6..a71e74e97f 100644 --- a/src/os/inc/osSemphone.h +++ b/src/os/inc/osSemphone.h @@ -33,6 +33,8 @@ bool taosCheckPthreadValid(pthread_t thread); int64_t taosGetPthreadId(); void taosResetPthread(pthread_t *thread); bool taosComparePthread(pthread_t first, pthread_t second); +int32_t taosGetPId(); +int32_t taosGetCurrentAPPName(char *name, int32_t* len); #ifdef __cplusplus } diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemphone.c index b91888845e..9eb8c18a40 100644 --- a/src/os/src/detail/osSemphone.c +++ b/src/os/src/detail/osSemphone.c @@ -34,5 +34,31 @@ bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } void taosResetPthread(pthread_t *thread) { *thread = 0; } bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; } +int32_t taosGetPId() { return getpid(); } + +int32_t taosGetCurrentAPPName(char *name, int32_t* len) { + const char* self = "/proc/self/exe"; + char path[PATH_MAX] = {0}; + + if (readlink(self, path, PATH_MAX) <= 0) { + return -1; + } + + path[PATH_MAX - 1] = 0; + char* end = strrchr(path, '/'); + if (end == NULL) { + return -1; + } + + ++end; + + strcpy(name, end); + + if (len != NULL) { + *len = strlen(name); + } + + return 0; +} #endif \ No newline at end of file From df2d6ea1c0b6a9cb557a6e8fdf61a08497fa853e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 13:46:33 +0800 Subject: [PATCH 05/18] [td-1637] --- src/client/src/tscSql.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index ac7081ba70..f37a641eba 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -459,6 +459,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlRes *pRes = &pSql->res; if (pRes->qhandle == 0 || + pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { return NULL; @@ -696,9 +697,8 @@ void taos_stop_query(TAOS_RES *res) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { assert(pSql->pRpcCtx == NULL); tscKillSTableQuery(pSql); - } else { - if (pSql->cmd.command < TSDB_SQL_LOCAL) { - assert(pSql->pRpcCtx != NULL); + } else { // TODO multithreads bug + if (pSql->cmd.command < TSDB_SQL_LOCAL && pSql->pRpcCtx != NULL) { rpcCancelRequest(pSql->pRpcCtx); } } From da9bf993bdab43c2c2f4a791fe702b26f460ef59 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 13:48:31 +0800 Subject: [PATCH 06/18] [td-1641] --- src/client/inc/tscUtil.h | 2 -- src/client/src/tscServer.c | 22 +++++++++++++--------- src/client/src/tscUtil.c | 7 ------- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 0323434a99..3aff1b8ef3 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -207,8 +207,6 @@ void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); -void tscSetFreeHeatBeat(STscObj* pObj); -bool tscShouldFreeHeartBeat(SSqlObj* pHb); bool tscShouldBeFreed(SSqlObj* pSql); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index de0c8d127d..5ec5de315e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -198,15 +198,19 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { return; } - if (tscShouldFreeHeartBeat(pHB)) { - tscDebug("%p free HB object and release connection", pHB); - pObj->pHb = 0; - taos_free_result(pHB); - } else { - int32_t code = tscProcessSql(pHB); - if (code != TSDB_CODE_SUCCESS) { - tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); - } + void** p = taosCacheAcquireByKey(tscObjCache, &pHB->self, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + tscWarn("%p HB object has been released already", pHB); + return; + } + + assert(*pHB->self == pHB); + + int32_t code = tscProcessSql(pHB); + taosCacheRelease(tscObjCache, (void**) &p, false); + + if (code != TSDB_CODE_SUCCESS) { + tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d00b39e68b..f6bb97e52f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1516,13 +1516,6 @@ void tscSetFreeHeatBeat(STscObj* pObj) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } -bool tscShouldFreeHeartBeat(SSqlObj* pHb) { - assert(pHb == pHb->signature); - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0); - return pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE; -} - /* * the following four kinds of SqlObj should not be freed * 1. SqlObj for stream computing From 47a0bc2c71afc93e747a498a91e851044974e32a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 16:33:41 +0800 Subject: [PATCH 07/18] [td-1641] --- src/util/src/tcache.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2637699adb..628f1c6c55 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -107,12 +107,14 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo free(pNode); } -static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { +static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { if (pElem->pData->signature != (uint64_t) pElem->pData) { uWarn("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData); - return; + return NULL; } + STrashElem* next = pElem->next; + pCacheObj->numOfElemsInTrash--; if (pElem->prev) { pElem->prev->next = pElem->next; @@ -120,9 +122,15 @@ static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem pCacheObj->pTrash = pElem->next; } - if (pElem->next) { - pElem->next->prev = pElem->prev; + if (next) { + next->prev = pElem->prev; } + + if (pCacheObj->numOfElemsInTrash == 0) { + assert(pCacheObj->pTrash == NULL); + } + + return next; } static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { @@ -580,9 +588,7 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { pCacheObj->numOfElemsInTrash - 1); STrashElem *p = pElem; - pElem = pElem->next; - - doRemoveElemInTrashcan(pCacheObj, p); + pElem = doRemoveElemInTrashcan(pCacheObj, p); doDestroyTrashcanElem(pCacheObj, p); } else { pElem = pElem->next; From 77202590114822ac437d219bf389c237c8e213cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 18:15:19 +0800 Subject: [PATCH 08/18] [td-1641] --- src/client/src/tscSql.c | 2 +- src/client/src/tscSubquery.c | 6 +++--- src/client/src/tscUtil.c | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index f37a641eba..7bc09a365a 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -559,7 +559,7 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { +static bool tscKillQueryInDnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 390c3ca92d..b6ac6d3173 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1516,9 +1516,9 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { SSqlObj *pParentSql = trsupport->pParentSql; assert(pSql == pParentSql->pSubs[index]); -// pParentSql->pSubs[index] = NULL; -// -// taos_free_result(pSql); + pParentSql->pSubs[index] = NULL; + + taos_free_result(pSql); taosTFree(trsupport->localBuffer); taosTFree(trsupport); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f6bb97e52f..c8a3d2607a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -415,7 +415,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscDebug("%p start to free sqlObj", pSql); - tscFreeSubobj(pSql); +// tscFreeSubobj(pSql); tscPartiallyFreeSqlObj(pSql); pSql->signature = NULL; From 97e69185f6162323d4907cc3d8f45bd841cccd8f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 18:44:24 +0800 Subject: [PATCH 09/18] [td-1641] --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5ec5de315e..0b76bff3b1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -198,7 +198,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { return; } - void** p = taosCacheAcquireByKey(tscObjCache, &pHB->self, sizeof(TSDB_CACHE_PTR_TYPE)); + void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); if (p == NULL) { tscWarn("%p HB object has been released already", pHB); return; From 83f8316e709c7ea39cd0d2cc1d3c0124a12ac39c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 21:15:01 +0800 Subject: [PATCH 10/18] [td-1641] --- src/client/src/tscSubquery.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b6ac6d3173..68ba3cd0bd 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1907,7 +1907,12 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->res.code = pSql->res.code; } + assert(pParentObj->pSubs[pSupporter->index] == tres); + + pParentObj->pSubs[pSupporter->index] = 0; + taos_free_result(tres); taosTFree(pSupporter); + if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { return; } From 6a88d306c8dd7c80f9974297a2ec6fd9501d430b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 10 Oct 2020 22:38:56 +0800 Subject: [PATCH 11/18] [td-1641] --- src/client/src/tscSubquery.c | 7 ------- src/client/src/tscUtil.c | 2 +- src/os/src/detail/osSysinfo.c | 4 ++-- src/util/src/tcache.c | 17 +++++++++-------- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 68ba3cd0bd..0c57b94054 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1516,9 +1516,6 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { SSqlObj *pParentSql = trsupport->pParentSql; assert(pSql == pParentSql->pSubs[index]); - pParentSql->pSubs[index] = NULL; - - taos_free_result(pSql); taosTFree(trsupport->localBuffer); taosTFree(trsupport); } @@ -1907,10 +1904,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->res.code = pSql->res.code; } - assert(pParentObj->pSubs[pSupporter->index] == tres); - - pParentObj->pSubs[pSupporter->index] = 0; - taos_free_result(tres); taosTFree(pSupporter); if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c8a3d2607a..f6bb97e52f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -415,7 +415,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscDebug("%p start to free sqlObj", pSql); -// tscFreeSubobj(pSql); + tscFreeSubobj(pSql); tscPartiallyFreeSqlObj(pSql); pSql->signature = NULL; diff --git a/src/os/src/detail/osSysinfo.c b/src/os/src/detail/osSysinfo.c index f6470fc3e1..8df671f9c8 100644 --- a/src/os/src/detail/osSysinfo.c +++ b/src/os/src/detail/osSysinfo.c @@ -203,7 +203,7 @@ static void taosGetSystemTimezone() { snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); // cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; - uInfo("timezone not configured, set to system default:%s", tsTimezone); + uWarn("timezone not configured, set to system default:%s", tsTimezone); } /* @@ -235,7 +235,7 @@ static void taosGetSystemLocale() { // get and set default locale strcpy(tsLocale, "en_US.UTF-8"); } else { tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN); - uError("locale not configured, set to system default:%s", tsLocale); + uWarn("locale not configured, set to system default:%s", tsLocale); } } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 628f1c6c55..2896481574 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -567,29 +567,30 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { if (pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj->pTrash != NULL) { + pCacheObj->pTrash = NULL; uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash); } - pCacheObj->pTrash = NULL; __cache_unlock(pCacheObj); return; } - STrashElem *pElem = pCacheObj->pTrash; + const char* stat[] = {"false", "true"}; + uDebug("cache:%s start to cleanup trashcan, numOfElem in trashcan:%d, free:%s", pCacheObj->name, + pCacheObj->numOfElemsInTrash, (force? stat[1]:stat[0])); + STrashElem *pElem = pCacheObj->pTrash; while (pElem) { T_REF_VAL_CHECK(pElem->pData); - if (pElem->next == pElem) { - pElem->next = NULL; - } + assert(pElem->next != pElem && pElem->prev != pElem); if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { uDebug("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data, pCacheObj->numOfElemsInTrash - 1); - STrashElem *p = pElem; - pElem = doRemoveElemInTrashcan(pCacheObj, p); - doDestroyTrashcanElem(pCacheObj, p); + doRemoveElemInTrashcan(pCacheObj, pElem); + doDestroyTrashcanElem(pCacheObj, pElem); + pElem = pCacheObj->pTrash; } else { pElem = pElem->next; } From 25ee3b480bb871723102e30f4cd96c72c8a39c27 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 11:18:11 +0800 Subject: [PATCH 12/18] [td-225] refactor client code --- src/client/inc/tscLocalMerge.h | 7 - src/client/inc/tscUtil.h | 1 - src/client/inc/tsclient.h | 13 +- src/client/src/tscAsync.c | 28 +--- src/client/src/tscLocalMerge.c | 2 +- src/client/src/tscSQLParser.c | 18 ++- src/client/src/tscServer.c | 24 ++-- src/client/src/tscSql.c | 15 ++- src/client/src/tscStream.c | 2 +- src/client/src/tscSubquery.c | 225 +++++++++++++++------------------ src/client/src/tscUtil.c | 18 +-- src/vnode/src/vnodeRead.c | 1 + 12 files changed, 172 insertions(+), 182 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 4e579b0729..5baa66a9e0 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -72,17 +72,10 @@ typedef struct SLocalReducer { bool orderPrjOnSTable; // projection query on stable } SLocalReducer; -typedef struct SSubqueryState { - int32_t numOfRemain; // the number of remain unfinished subquery - int32_t numOfTotal; // the number of total sub-queries - uint64_t numOfRetrievedRows; // total number of points in this query -} SSubqueryState; - typedef struct SRetrieveSupport { tExtMemBuffer ** pExtMemBuffer; // for build loser tree tOrderDescriptor *pOrderDescriptor; SColumnModel * pFinalColModel; // colModel for final result - SSubqueryState * pState; int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSql; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3aff1b8ef3..594226b1fc 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -66,7 +66,6 @@ typedef struct STidTags { #pragma pack(pop) typedef struct SJoinSupporter { - SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query SInterval interval; diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 046bea2c27..b6ab3702c9 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -334,6 +334,12 @@ typedef struct STscObj { T_REF_DECLARE() } STscObj; +typedef struct SSubqueryState { + int32_t numOfRemain; // the number of remain unfinished subquery + int32_t numOfSub; // the number of total sub-queries + uint64_t numOfRetrievedRows; // total number of points in this query +} SSubqueryState; + typedef struct SSqlObj { void *signature; pthread_t owner; // owner of sql object, by which it is executed @@ -355,10 +361,11 @@ typedef struct SSqlObj { tsem_t rspSem; SSqlCmd cmd; SSqlRes res; - uint16_t numOfSubs; - struct SSqlObj **pSubs; - struct SSqlObj * prev, *next; + SSubqueryState subState; + struct SSqlObj **pSubs; + + struct SSqlObj *prev, *next; struct SSqlObj **self; } SSqlObj; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 400613595b..a76b77bb86 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); } -void tscProcessAsyncRes(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - SSqlRes *pRes = &pSql->res; - assert(pSql->fp != NULL && pSql->fetchFp != NULL); - - pSql->fp = pSql->fetchFp; - (*pSql->fp)(pSql->param, pSql, pRes->code); -} - // this function will be executed by queue task threads, so the terrno is not valid static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; @@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); return; - } else { - tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); } - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscProcessAsyncRes; - schedMsg.ahandle = pSql; - schedMsg.thandle = (void *)1; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); -} + tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); -void tscProcessAsyncFree(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - tscDebug("%p sql is freed", pSql); - taos_free_result(pSql); + SSqlRes *pRes = &pSql->res; + assert(pSql->fp != NULL && pSql->fetchFp != NULL); + + pSql->fp = pSql->fetchFp; + (*pSql->fp)(pSql->param, pSql, pRes->code); } int tscSendMsgToServer(SSqlObj *pSql); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 19abbe32e5..16f208da98 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs); + (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub); if (*pMemBuffer == NULL) { tscError("%p failed to allocate memory", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b65d7b7112..b87882ba63 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1603,8 +1603,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t return TSDB_CODE_SUCCESS; } -static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, char* aliasName, - int32_t resColIdx, SColumnIndex* pColIndex) { +static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, + char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* msg1 = "not support column types"; int16_t type = 0; @@ -1650,8 +1650,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; tscColumnListInsert(pQueryInfo->colList, &index); + // if it is not in the final result, do not add it SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); - insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + if (finalResult) { + insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr); + } else { + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0])); + } return TSDB_CODE_SUCCESS; } @@ -1926,7 +1931,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { index.columnIndex = j; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } } @@ -1943,7 +1948,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); } - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index) != 0) { + + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -1980,7 +1986,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0b76bff3b1..5cddaa1c4d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -24,7 +24,6 @@ #include "tschemautil.h" #include "tsclient.h" #include "ttimer.h" -#include "tutil.h" #include "tlockfree.h" SRpcCorEpSet tscMgmtEpSet; @@ -478,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - for (int i = 0; i < pSql->numOfSubs; ++i) { + for (int i = 0; i < pSql->subState.numOfSub; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } - pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSub->pRpcCtx != NULL) { - rpcCancelRequest(pSub->pRpcCtx); - pSub->pRpcCtx = NULL; + void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + continue; } - tscQueueAsyncRes(pSub); // async res? not other functions? + SSqlObj* pSubObj = (SSqlObj*) (*p); + assert(pSubObj->self == (SSqlObj**) p); + + pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + if (pSubObj->pRpcCtx != NULL) { + rpcCancelRequest(pSubObj->pRpcCtx); + pSubObj->pRpcCtx = NULL; + } + + tscQueueAsyncRes(pSubObj); // async res? not other functions? + taosCacheRelease(tscObjCache, (void**) &p, false); } tscDebug("%p super table query cancelled", pSql); @@ -1455,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) { int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; int32_t code = pRes->code; if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 7bc09a365a..fa88c57fdd 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -523,7 +523,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { pRes->numOfClauseTotal = 0; pRes->rspType = 0; - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; taosTFree(pSql->pSubs); assert(pSql->fp == NULL); @@ -559,7 +559,7 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static bool tscKillQueryInDnode(SSqlObj* pSql) { +static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -568,10 +568,18 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { + if (pQueryInfo == NULL) { return true; } + if (pSql->pRpcCtx != NULL) { + rpcCancelRequest(pSql->pRpcCtx); + pSql->pRpcCtx = NULL; + return true; + } else { + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + } + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); tscRemoveFromSqlList(pSql); @@ -700,6 +708,7 @@ void taos_stop_query(TAOS_RES *res) { } else { // TODO multithreads bug if (pSql->cmd.command < TSDB_SQL_LOCAL && pSql->pRpcCtx != NULL) { rpcCancelRequest(pSql->pRpcCtx); + pSql->pRpcCtx = NULL; } } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index d01ede279a..61614b56fb 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false); tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; taosTFree(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 0c57b94054..59b5404eea 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -26,7 +26,6 @@ #include "tscSubquery.h" typedef struct SInsertSupporter { - SSubqueryState* pState; SSqlObj* pSql; int32_t index; } SInsertSupporter; @@ -174,7 +173,6 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in } pSupporter->pObj = pSql; - pSupporter->pState = pState; pSupporter->subqueryIndex = index; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -250,7 +248,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { SJoinSupporter* pSupporter = NULL; //If the columns are not involved in the final select clause, the corresponding query will not be issued. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { pSupporter = pSql->pSubs[i]->param; if (taosArrayGetSize(pSupporter->exprList) > 0) { ++numOfSub; @@ -260,16 +258,16 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { assert(numOfSub > 0); // scan all subquery, if one sub query has only ts, ignore it - tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub); + tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->subState.numOfSub, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - SSubqueryState* pState = pSupporter->pState; - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = numOfSub; - +// SSubqueryState* pState = pSupporter->pState; +// pState->numOfSub = pSql->subState.numOfSub; +// pSql->numOfRemain = numOfSub; + bool success = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj *pPrevSub = pSql->pSubs[i]; pSql->pSubs[i] = NULL; @@ -322,7 +320,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); + assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); tscFieldInfoUpdateOffset(pNewQueryInfo); @@ -373,13 +371,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { if (!success) { pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql, - pSql->numOfSubs, pSql->res.code); + pSql->subState.numOfSub, pSql->res.code); freeJoinSubqueryObj(pSql); return pSql->res.code; } - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -391,17 +389,13 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { } void freeJoinSubqueryObj(SSqlObj* pSql) { - SSubqueryState* pState = NULL; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } SJoinSupporter* p = pSub->param; - pState = p->pState; - tscDestroyJoinSupporter(p); if (pSub->res.code == TSDB_CODE_SUCCESS) { @@ -409,14 +403,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { } } - taosTFree(pState); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { - assert(pSupporter->pState->numOfRemain > 0); + assert(pSqlObj->subState.numOfRemain > 0); - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) { + if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) { tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); freeJoinSubqueryObj(pSqlObj); } @@ -680,7 +673,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // no data exists in next vnode, mark the query completed // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets. - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -716,10 +709,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2); - pSupporter->pState->numOfTotal = 2; - pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; + pParentSql->subState.numOfSub = 2; + pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; - for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) { + for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { SSqlObj* sub = pParentSql->pSubs[m]; issueTSCompQuery(sub, sub->param, pParentSql); } @@ -818,7 +811,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow return; } - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -850,7 +843,6 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR SJoinSupporter* pSupporter = (SJoinSupporter*)param; SSqlObj* pParentSql = pSupporter->pObj; - SSubqueryState* pState = pSupporter->pState; SSqlObj* pSql = (SSqlObj*)tres; SSqlCmd* pCmd = &pSql->cmd; @@ -871,6 +863,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pRes->numOfTotal += pRes->numOfRows; } + SSubqueryState* pState = &pParentSql->subState; if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -878,7 +871,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // for projection query, need to try next vnode if current vnode is exhausted if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) { pState->numOfRemain = 1; - pState->numOfTotal = 1; + pState->numOfSub = 1; pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; @@ -888,12 +881,12 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } } - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { - tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal); + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { + tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pParentSql->subState.numOfRemain, pState->numOfSub); return; } - tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code); + tscDebug("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfSub, pParentSql->res.code); if (pParentSql->res.code != TSDB_CODE_SUCCESS) { freeJoinSubqueryObj(pParentSql); @@ -901,7 +894,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR } // update the records for each subquery in parent sql object. - for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pState->numOfSub; ++i) { if (pParentSql->pSubs[i] == NULL) { continue; } @@ -917,32 +910,26 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { int32_t notInvolved = 0; SJoinSupporter* pSupporter = NULL; - SSubqueryState* pState = NULL; + SSubqueryState* pState = &pSql->subState; - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { notInvolved++; } else { pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; - pState = pSupporter->pState; } } - assert(pState != NULL); - if (pState != NULL) { - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = numOfFetch; - } - + pState->numOfRemain = numOfFetch; return pSupporter; } void tscFetchDatablockFromSubquery(SSqlObj* pSql) { - assert(pSql->numOfSubs >= 1); + assert(pSql->subState.numOfSub >= 1); int32_t numOfFetch = 0; bool hasData = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { // if the subquery is NULL, it does not involved in the final result generation SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { @@ -989,7 +976,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { continue; @@ -1124,7 +1111,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { } // wait for the other subqueries response from vnode - if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { return; } @@ -1136,7 +1123,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * data instead of returning to its invoker */ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value + pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; @@ -1164,8 +1151,9 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter pSql->res.qhandle = 0x1; assert(pSql->res.numOfRows == 0); + int32_t index = 0; if (pSql->pSubs == NULL) { - pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1176,8 +1164,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[pSql->numOfSubs++] = pNew; - assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); + pSql->pSubs[index++] = pNew; + assert(index <= pSql->subState.numOfSub); if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); @@ -1221,7 +1209,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // return the tableId & tag - SColumnIndex index = {0}; + SColumnIndex colIndex = {0}; STagCond* pTagCond = &pSupporter->tagCond; assert(pTagCond->joinInfo.hasJoin); @@ -1234,7 +1222,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); for(int32_t i = 0; i < numOfTags; ++i) { if (pSchema[i].colId == tagColId) { - index.columnIndex = i; + colIndex.columnIndex = i; break; } } @@ -1251,18 +1239,18 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter // set get tags query type TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s1, TSDB_COL_TAG); + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &colIndex, &s1, TSDB_COL_TAG); size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscDebug( "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s", pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), - numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, index.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, pNewQueryInfo->pTableMetaInfo[0]->name); } else { SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; - SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); + SColumnIndex colIndex = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL); // set the tags value for ts_comp function SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); @@ -1320,8 +1308,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { goto _error; } - pState->numOfTotal = pQueryInfo->numOfTables; - pState->numOfRemain = pState->numOfTotal; + pSql->subState.numOfSub = pQueryInfo->numOfTables; bool hasEmptySub = false; @@ -1354,10 +1341,10 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; (*pSql->fp)(pSql->param, pSql, 0); } else { - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSub = pSql->pSubs[i]; if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { - pState->numOfRemain = i - 1; // the already sent reques will continue and do not go to the error process routine + pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine break; } } @@ -1373,7 +1360,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { } static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) { - assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL); + assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0 && pState != NULL); for(int32_t i = 0; i < numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; @@ -1411,8 +1398,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - pSql->numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups; - assert(pSql->numOfSubs > 0); + pSql->subState.numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; + assert(pSql->subState.numOfSub > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); if (ret != 0) { @@ -1422,28 +1409,26 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { return ret; } - pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs); + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pSql->subState.numOfSub); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); if (pSql->pSubs == NULL || pState == NULL) { taosTFree(pState); taosTFree(pSql->pSubs); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); tscQueueAsyncRes(pSql); return ret; } - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pSql->numOfSubs; - + pSql->subState.numOfRemain = pSql->subState.numOfSub; pRes->code = TSDB_CODE_SUCCESS; int32_t i = 0; - for (; i < pSql->numOfSubs; ++i) { + for (; i < pSql->subState.numOfSub; ++i) { SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); if (trs == NULL) { tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1452,8 +1437,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { trs->pExtMemBuffer = pMemoryBuf; trs->pOrderDescriptor = pDesc; - trs->pState = pState; - + trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); if (trs->localBuffer == NULL) { tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); @@ -1461,8 +1445,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { break; } - trs->subqueryIndex = i; - trs->pParentSql = pSql; + trs->subqueryIndex = i; + trs->pParentSql = pSql; trs->pFinalColModel = pModel; SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); @@ -1483,39 +1467,39 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tscDebug("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex); } - if (i < pSql->numOfSubs) { + if (i < pSql->subState.numOfSub) { tscError("%p failed to prepare subquery structure and launch subqueries", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); doCleanupSubqueries(pSql, i, pState); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->numOfSubs); + tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pSql->subState.numOfSub); doCleanupSubqueries(pSql, i, pState); return pRes->code; } - for(int32_t j = 0; j < pSql->numOfSubs; ++j) { + for(int32_t j = 0; j < pSql->subState.numOfSub; ++j) { SSqlObj* pSub = pSql->pSubs[j]; SRetrieveSupport* pSupport = pSub->param; tscDebug("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex); tscProcessSql(pSub); } - + return TSDB_CODE_SUCCESS; } static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) { tscDebug("%p start to free subquery obj", pSql); - int32_t index = trsupport->subqueryIndex; - SSqlObj *pParentSql = trsupport->pParentSql; +// int32_t index = trsupport->subqueryIndex; +// SSqlObj *pParentSql = trsupport->pParentSql; - assert(pSql == pParentSql->pSubs[index]); +// assert(pSql == pParentSql->pSubs[index]); taosTFree(trsupport->localBuffer); taosTFree(trsupport); } @@ -1578,9 +1562,14 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); - + SSubqueryState* pState = &pParentSql->subState; + int32_t remain = pState->numOfRemain; + int32_t sub = pState->numOfSub; + UNUSED(remain); + UNUSED(sub); + + assert(pParentSql->subState.numOfRemain <= pState->numOfSub && pParentSql->subState.numOfRemain >= 0); + // retrieved in subquery failed. OR query cancelled in retrieve phase. if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -1610,24 +1599,23 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO } } - int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + remain = -1; + if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfTotal - remain); + pState->numOfSub - remain); tscFreeSubSqlObj(trsupport, pSql); return; } // all subqueries are failed - tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal, + tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, tstrerror(pParentSql->res.code)); // release allocated resource tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel, - pState->numOfTotal); + pState->numOfSub); - taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes @@ -1647,7 +1635,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p SSqlObj * pParentSql = trsupport->pParentSql; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; - SSubqueryState* pState = trsupport->pState; + SSubqueryState* pState = &pParentSql->subState; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; @@ -1684,9 +1672,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p } int32_t remain = -1; - if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { + if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex, - pState->numOfTotal - remain); + pState->numOfSub - remain); tscFreeSubSqlObj(trsupport, pSql); return; @@ -1696,20 +1684,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql, - pState->numOfTotal, pState->numOfRetrievedRows); + pState->numOfSub, pState->numOfRetrievedRows); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0); tscClearInterpInfo(pPQueryInfo); - tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql); + tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, pParentSql); tscDebug("%p build loser tree completed", pParentSql); pParentSql->res.precision = pSql->res.precision; pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - // only free once - taosTFree(trsupport->pState); tscFreeSubSqlObj(trsupport, pSql); // set the command flag must be after the semaphore been correctly set. @@ -1730,8 +1716,8 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR assert(tres != NULL); SSqlObj *pSql = (SSqlObj *)tres; - SSubqueryState* pState = trsupport->pState; - assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal); + SSubqueryState* pState = &pParentSql->subState; + assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); SCMVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; @@ -1748,6 +1734,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(numOfRows == taos_errno(pSql)); + if (numOfRows == TSDB_CODE_TSC_QUERY_CANCELLED) { + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + } + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); @@ -1819,7 +1809,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->numOfSubs); + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); @@ -1890,7 +1880,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; - SSubqueryState* pState = pSupporter->pState; // record the total inserted rows if (numOfRows > 0) { @@ -1906,15 +1895,12 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) taosTFree(pSupporter); - if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { + if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { return; } tscDebug("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows); - // release data block data - taosTFree(pState); - // restore user defined fp pParentObj->fp = pParentObj->fetchFp; @@ -1935,7 +1921,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; - assert(pSupporter->index < pSupporter->pState->numOfTotal); + assert(pSupporter->index < pSupporter->pSql->subState.numOfSub); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); @@ -1952,33 +1938,29 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - pSql->numOfSubs = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); - assert(pSql->numOfSubs > 0); + pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->pDataBlocks); + assert(pSql->subState.numOfSub > 0); pRes->code = TSDB_CODE_SUCCESS; // the number of already initialized subqueries int32_t numOfSub = 0; - SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); - pState->numOfTotal = pSql->numOfSubs; - pState->numOfRemain = pSql->numOfSubs; - - pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES); + pSql->subState.numOfRemain = pSql->subState.numOfSub; + pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { goto _error; } - tscDebug("%p submit data to %d vnode(s)", pSql, pSql->numOfSubs); + tscDebug("%p submit data to %d vnode(s)", pSql, pSql->subState.numOfSub); - while(numOfSub < pSql->numOfSubs) { + while(numOfSub < pSql->subState.numOfSub) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { goto _error; } pSupporter->pSql = pSql; - pSupporter->pState = pState; pSupporter->index = numOfSub; SSqlObj *pNew = createSimpleSubObj(pSql, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT); @@ -2001,12 +1983,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { numOfSub++; } else { tscDebug("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub, - pSql->numOfSubs, tstrerror(pRes->code)); + pSql->subState.numOfSub, tstrerror(pRes->code)); goto _error; } } - if (numOfSub < pSql->numOfSubs) { + if (numOfSub < pSql->subState.numOfSub) { tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; @@ -2024,7 +2006,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; _error: - taosTFree(pState); return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -2046,7 +2027,7 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); int32_t numOfRes = INT32_MAX; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -2236,7 +2217,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { bool allSubqueryExhausted = true; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { continue; } @@ -2262,7 +2243,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { hasData = !allSubqueryExhausted; } else { // otherwise, in case inner join, if any subquery exhausted, query completed. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == 0) { continue; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f6bb97e52f..33c51c5571 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pSql->self = 0; tscResetSqlCmdObj(pCmd, false); } -static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { - if (pSql->numOfSubs == 0) { +static void tscFreeSubobj(SSqlObj* pSql) { + if (pSql->subState.numOfSub == 0) { return; } - tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs); + tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->subState.numOfSub); - for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i); taos_free_result(pSql->pSubs[i]); pSql->pSubs[i] = NULL; } - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; } /** @@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscDebug("%p start to free sqlObj", pSql); + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; tscFreeSubobj(pSql); + tscPartiallyFreeSqlObj(pSql); pSql->signature = NULL; @@ -2284,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { * * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. */ - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pCmd->command = TSDB_SQL_SELECT; tscResetForNextRetrieve(pRes); @@ -2316,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) { pRes->numOfTotal = num; taosTFree(pSql->pSubs); - pSql->numOfSubs = 0; + pSql->subState.numOfSub = 0; pSql->fp = fp; tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 58e97075ac..a3ed080f24 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return code; } From 66ab0cd04c5e09d602c08d89c7e41c5c7fe9b2e4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 11:47:45 +0800 Subject: [PATCH 13/18] [td-225] --- src/os/src/windows/wSemphone.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemphone.c index ded7e41843..e72a6cb091 100644 --- a/src/os/src/windows/wSemphone.c +++ b/src/os/src/windows/wSemphone.c @@ -36,3 +36,19 @@ int64_t taosGetPthreadId() { bool taosComparePthread(pthread_t first, pthread_t second) { return first.p == second.p; } + +int32_t taosGetPId() { + return GetCurrentProcessId(); +} + +int32_t taosGetCurrentAPPName(char *name, int32_t* len) { + char filepath[1024] = {0}; + + GetModuleFileName(NULL, filepath, MAX_PATH); + *strrchr(filepath,'.') = '\0'; + strcpy(name, filepath); + + if (len != NULL) { + len = strlen(filepath); + } +} \ No newline at end of file From 3caf1ffc046e14c72dbd457f707fcf73b674a8fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 11:53:16 +0800 Subject: [PATCH 14/18] [td-225] --- src/os/src/windows/wSemphone.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemphone.c index e72a6cb091..02e6247bba 100644 --- a/src/os/src/windows/wSemphone.c +++ b/src/os/src/windows/wSemphone.c @@ -49,6 +49,6 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) { strcpy(name, filepath); if (len != NULL) { - len = strlen(filepath); + *len = strlen(filepath); } } \ No newline at end of file From d4a18dd68c9830cbb64edc21e818fc1b644479d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 13:14:50 +0800 Subject: [PATCH 15/18] [td-225] --- src/os/src/windows/wSemphone.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemphone.c index 02e6247bba..c234265d28 100644 --- a/src/os/src/windows/wSemphone.c +++ b/src/os/src/windows/wSemphone.c @@ -49,6 +49,6 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) { strcpy(name, filepath); if (len != NULL) { - *len = strlen(filepath); + *len = (int32_t) strlen(filepath); } } \ No newline at end of file From 49ae6c39e527f2d5410d7b843d7f357b0cab8a37 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 13:26:32 +0800 Subject: [PATCH 16/18] [td-225] --- src/os/src/windows/wSemphone.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemphone.c index c234265d28..1f723540f6 100644 --- a/src/os/src/windows/wSemphone.c +++ b/src/os/src/windows/wSemphone.c @@ -51,4 +51,6 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) { if (len != NULL) { *len = (int32_t) strlen(filepath); } + + return 0; } \ No newline at end of file From daea8133ed87774f13f0caf7bc8d8a94eb3b5b16 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 18:22:07 +0800 Subject: [PATCH 17/18] [td-255] compact result for all show command. --- src/balance/src/balance.c | 1 + src/mnode/src/mnodeCluster.c | 3 ++- src/mnode/src/mnodeDb.c | 2 +- src/mnode/src/mnodeDnode.c | 6 ++++-- src/mnode/src/mnodeMnode.c | 2 +- src/mnode/src/mnodeProfile.c | 7 +++---- src/mnode/src/mnodeTable.c | 8 +++----- src/mnode/src/mnodeUser.c | 2 +- src/mnode/src/mnodeVgroup.c | 3 ++- 9 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 0d4da965e2..d3a2fb01fd 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -937,6 +937,7 @@ static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, mnodeDecDnodeRef(pDnode); } + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 5d19b67a5e..35b6a67ab2 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -224,7 +224,8 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, mnodeDecClusterRef(pCluster); numOfRows++; } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index a06152080e..37a626c6cc 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -760,7 +760,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void } pShow->numOfReads += numOfRows; - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecUserRef(pUser); return numOfRows; diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 4c777e4eed..33a869a353 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -790,6 +790,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo mnodeDecDnodeRef(pDnode); } + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } @@ -891,8 +892,8 @@ int32_t mnodeRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pC mnodeDecDnodeRef(pDnode); } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } @@ -992,6 +993,7 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v } } + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } @@ -1083,8 +1085,8 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo } else { numOfRows = 0; } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 68828ac24d..89b2f50b73 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -480,8 +480,8 @@ static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, vo mnodeDecMnodeRef(pMnode); } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 25ca526d7e..fc76e3dce3 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -275,7 +275,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi } pShow->numOfReads += numOfRows; - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; } @@ -426,7 +426,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; } @@ -554,8 +554,7 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 8; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 850aebd186..3160d7e8e9 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1384,9 +1384,8 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 5; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecDbRef(pDb); return numOfRows; @@ -2679,7 +2678,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows pShow->numOfReads += numOfRows; - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecDbRef(pDb); free(pattern); @@ -2877,9 +2876,8 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro } pShow->numOfReads += numOfRows; - const int32_t NUM_OF_COLUMNS = 4; - mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeDecDbRef(pDb); return numOfRows; diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 130b68646f..779e252548 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -385,8 +385,8 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi numOfRows++; mnodeDecUserRef(pUser); } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; return numOfRows; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 283132697d..1f4132544b 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -771,7 +771,8 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v mnodeDecVgroupRef(pVgroup); numOfRows++; } - mnodeVacuumResult(data, cols, numOfRows, rows, pShow); + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; mnodeDecTableRef(pTable); From e66cb7d967e740ef2e1e1e558447aa1d393729bd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Oct 2020 23:26:20 +0800 Subject: [PATCH 18/18] [td-225] --- src/client/src/tscSubquery.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 59b5404eea..fbbd810a19 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -261,9 +261,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscDebug("%p start to launch secondary subqueries, total:%d, only:%d needs to query", pSql, pSql->subState.numOfSub, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. -// SSubqueryState* pState = pSupporter->pState; -// pState->numOfSub = pSql->subState.numOfSub; -// pSql->numOfRemain = numOfSub; + SSubqueryState* pState = &pSql->subState; + pState->numOfRemain = pState->numOfSub; bool success = true; @@ -1151,7 +1150,6 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter pSql->res.qhandle = 0x1; assert(pSql->res.numOfRows == 0); - int32_t index = 0; if (pSql->pSubs == NULL) { pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { @@ -1164,8 +1162,8 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter return TSDB_CODE_TSC_OUT_OF_MEMORY; } - pSql->pSubs[index++] = pNew; - assert(index <= pSql->subState.numOfSub); + pSql->pSubs[pSql->subState.numOfRemain++] = pNew; + assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub); if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);