From 1eb62ce38ca0653d5a09d50352a8a05c2a446bb3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jul 2020 15:51:06 +0800 Subject: [PATCH 01/14] [td-225] fix bugs in error handling. --- src/client/src/tscSubquery.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 6c3580bad4..dbb3c90a0e 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -550,7 +550,7 @@ static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, return true; } -static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { +static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) { tscDebug("%p all subqueries retrieve complete, do tags match", pParentSql); SJoinSupporter* p1 = pParentSql->pSubs[0]->param; @@ -568,10 +568,7 @@ static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParent *s2 = taosArrayInit(p2->num, p2->tagSize); if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) { - freeJoinSubqueryObj(pParentSql); - pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY; - tscQueueAsyncRes(pParentSql); - return; + return TSDB_CODE_QRY_DUP_JOIN_KEY; } int32_t i = 0, j = 0; @@ -594,6 +591,8 @@ static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParent i++; } } + + return TSDB_CODE_SUCCESS; } static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) { @@ -680,7 +679,14 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow } SArray *s1 = NULL, *s2 = NULL; - getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); + int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); + if (code != TSDB_CODE_SUCCESS) { + freeJoinSubqueryObj(pParentSql); + pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); + return; + } + if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql); freeJoinSubqueryObj(pParentSql); From 9e0ac1ebef3138007f239d02d07e767b69c02136 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jul 2020 17:01:35 +0800 Subject: [PATCH 02/14] [td-225] fix compiler error. --- src/client/inc/tsclient.h | 2 +- src/util/inc/tarray.h | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f8ca288a13..be82eb64a8 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -415,7 +415,7 @@ int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); //void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) { - SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); + SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex); assert(pInfo->pSqlExpr != NULL); int32_t type = pInfo->pSqlExpr->resType; diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index c05e2757d6..71838af150 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -23,14 +23,13 @@ extern "C" { #include "os.h" #define TARRAY_MIN_SIZE 8 -#define TARRAY_GET_ELEM(array, index) ((void*)((array)->pData + (index) * (array)->elemSize)) +#define TARRAY_GET_ELEM(array, index) ((void*)((char*)((array)->pData) + (index) * (array)->elemSize)) typedef struct SArray { size_t size; size_t capacity; size_t elemSize; - - void* pData; + void* pData; } SArray; /** From a2b9a4cad7d1ad24a13ffa830ab4dbf5ae34eb9b Mon Sep 17 00:00:00 2001 From: Hui Li Date: Sat, 11 Jul 2020 17:18:45 +0800 Subject: [PATCH 03/14] [reset vnode status from update to ready when process failed ] --- src/vnode/src/vnodeMain.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index d5221bae10..695a55d476 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -176,16 +176,28 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { pVnode->status = TAOS_VN_STATUS_UPDATING; int32_t code = vnodeSaveCfg(pVnodeCfg); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } code = vnodeReadCfg(pVnode); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } code = syncReconfig(pVnode->sync, &pVnode->syncCfg); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + pVnode->status = TAOS_VN_STATUS_READY; + return code; + } pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is altered", pVnode->vgId); From 46eae6c8dab55fd00c598391acee4126bfe80992 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 11 Jul 2020 09:41:00 +0000 Subject: [PATCH 04/14] [TD-904] change replica definition --- src/client/src/tscAsync.c | 2 +- src/client/src/tscPrepare.c | 2 +- src/client/src/tscSQLParser.c | 4 ++-- src/client/src/tscSql.c | 2 +- src/client/src/tscSub.c | 2 +- src/client/src/tscUtil.c | 4 ++-- src/common/src/tglobal.c | 6 +++--- src/inc/taosdef.h | 6 +++--- src/inc/taosmsg.h | 2 +- src/mnode/src/mnodeDb.c | 6 +++--- 10 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 41464aa660..ffe42de236 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -43,7 +43,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->maxRetry = TSDB_MAX_REPLICA; pSql->fp = fp; pSql->sqlstr = calloc(1, sqlLen + 1); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 9f0d1a26ab..0cf69dfd46 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -497,7 +497,7 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { tsem_init(&pSql->rspSem, 0, 0); pSql->signature = pSql; pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->maxRetry = TSDB_MAX_REPLICA; pStmt->pSql = pSql; return pStmt; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 44d10ec2c4..ca6dfcee9e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5489,9 +5489,9 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { } if (pCreate->replications != -1 && - (pCreate->replications < TSDB_MIN_REPLICA_NUM || pCreate->replications > TSDB_MAX_REPLICA_NUM)) { + (pCreate->replications < TSDB_MIN_DB_REPLICA_OPTION || pCreate->replications > TSDB_MAX_DB_REPLICA_OPTION)) { snprintf(msg, tListLen(msg), "invalid db option replications: %d valid range: [%d, %d]", pCreate->replications, - TSDB_MIN_REPLICA_NUM, TSDB_MAX_REPLICA_NUM); + TSDB_MIN_DB_REPLICA_OPTION, TSDB_MAX_DB_REPLICA_OPTION); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 6d75aef01f..73b93435ad 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -113,7 +113,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con pSql->pTscObj = pObj; pSql->signature = pSql; - pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->maxRetry = TSDB_MAX_REPLICA; tsem_init(&pSql->rspSem, 0, 0); pObj->pDnodeConn = pDnodeConn; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index f7d03bd787..2c5035c2ef 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -107,7 +107,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* pSql->signature = pSql; pSql->param = pSql; pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->maxRetry = TSDB_MAX_REPLICA; pSql->fp = asyncCallback; int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c4641afbf3..eea5d60f64 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1649,7 +1649,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm pNew->fp = fp; pNew->param = param; - pNew->maxRetry = TSDB_MAX_REPLICA_NUM; + pNew->maxRetry = TSDB_MAX_REPLICA; pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { @@ -1804,7 +1804,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->fp = fp; pNew->param = param; - pNew->maxRetry = TSDB_MAX_REPLICA_NUM; + pNew->maxRetry = TSDB_MAX_REPLICA; char* name = pTableMetaInfo->name; STableMetaInfo* pFinalInfo = NULL; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 40c1ee4280..a8ef3f8e8d 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -110,7 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION; int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; -int32_t tsReplications = TSDB_DEFAULT_REPLICA_NUM; +int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; // balance @@ -706,8 +706,8 @@ static void doInitGlobalConfig() { cfg.ptr = &tsReplications; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; - cfg.minValue = TSDB_MIN_REPLICA_NUM; - cfg.maxValue = TSDB_MAX_REPLICA_NUM; + cfg.minValue = TSDB_MIN_DB_REPLICA_OPTION; + cfg.maxValue = TSDB_MAX_DB_REPLICA_OPTION; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 8e4d8c4253..eb3c3abc84 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -332,9 +332,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_WAL_LEVEL 2 #define TSDB_DEFAULT_WAL_LEVEL 1 -#define TSDB_MIN_REPLICA_NUM 1 -#define TSDB_MAX_REPLICA_NUM 3 -#define TSDB_DEFAULT_REPLICA_NUM 1 +#define TSDB_MIN_DB_REPLICA_OPTION 1 +#define TSDB_MAX_DB_REPLICA_OPTION 3 +#define TSDB_DEFAULT_DB_REPLICA_OPTION 1 #define TSDB_MAX_JOIN_TABLE_NUM 5 #define TSDB_MAX_UNION_CLAUSE 5 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f05392b10a..d96e9f68d5 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -646,7 +646,7 @@ typedef struct SCMSTableVgroupMsg { typedef struct { int32_t vgId; int8_t numOfIps; - SIpAddr ipAddr[TSDB_MAX_REPLICA_NUM]; + SIpAddr ipAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo; typedef struct { diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 497d64eabf..e8183e9089 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -287,9 +287,9 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { return TSDB_CODE_MND_INVALID_DB_OPTION; } - if (pCfg->replications < TSDB_MIN_REPLICA_NUM || pCfg->replications > TSDB_MAX_REPLICA_NUM) { - mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_REPLICA_NUM, - TSDB_MAX_REPLICA_NUM); + if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) { + mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION, + TSDB_MAX_DB_REPLICA_OPTION); return TSDB_CODE_MND_INVALID_DB_OPTION; } From 798e7f2c99338f30d790a2a984e65e7e1a3ca2ca Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 11 Jul 2020 14:39:03 +0000 Subject: [PATCH 05/14] [TD-888] add protection while deplayed process mnode msg --- src/mnode/src/mnodeSdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 46678cb133..d3192a2460 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -393,7 +393,7 @@ void sdbCleanUp() { } void sdbIncRef(void *handle, void *pObj) { - if (pObj == NULL) return; + if (pObj == NULL || handle == NULL) return; SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); @@ -402,7 +402,7 @@ void sdbIncRef(void *handle, void *pObj) { } void sdbDecRef(void *handle, void *pObj) { - if (pObj == NULL) return; + if (pObj == NULL || handle == NULL) return; SSdbTable *pTable = handle; int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); From 46304e6bd847bc4832a5d2258468c24f066ccb66 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 11 Jul 2020 15:21:55 +0000 Subject: [PATCH 06/14] [TD-825] check http release contexet funciton --- src/plugins/http/inc/httpCode.h | 2 +- src/plugins/http/inc/httpInt.h | 3 +++ src/plugins/http/inc/httpServer.h | 2 +- src/plugins/http/src/httpHandle.c | 25 +++++++++---------------- src/plugins/http/src/httpServer.c | 21 ++++++++++++--------- 5 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/plugins/http/inc/httpCode.h b/src/plugins/http/inc/httpCode.h index 0235040139..08111260e9 100644 --- a/src/plugins/http/inc/httpCode.h +++ b/src/plugins/http/inc/httpCode.h @@ -105,7 +105,7 @@ #define HTTP_OP_VALUE_TYPE 79 //tgf -#define HTTP_TG_STABLE_NOT_EXIST 80 +#define HTTP_TG_STABLE_NOT_EXIST 80 extern char *httpMsg[]; diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 8ca1c2ff11..ffd621be7a 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -61,6 +61,9 @@ #define HTTP_CHECK_BODY_CONTINUE 0 #define HTTP_CHECK_BODY_SUCCESS 1 +#define HTTP_READ_DATA_SUCCESS 0 +#define HTTP_READ_DATA_FAILED 1 + #define HTTP_WRITE_RETRY_TIMES 500 #define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_EXPIRED_TIME 60000 diff --git a/src/plugins/http/inc/httpServer.h b/src/plugins/http/inc/httpServer.h index 04dadfe04c..508baa6112 100644 --- a/src/plugins/http/inc/httpServer.h +++ b/src/plugins/http/inc/httpServer.h @@ -23,6 +23,6 @@ void httpCleanUpConnect(); void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void httpCleanUpServer(HttpServer *pServer); -bool httpReadDataImp(HttpContext *pContext); +int httpReadDataImp(HttpContext *pContext); #endif diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index 2c94f61950..407d19b307 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -60,6 +60,7 @@ bool httpParseURL(HttpContext* pContext) { char* pSeek; char* pEnd = strchr(pParser->pLast, ' '); if (pEnd == NULL) { + httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL); return false; } @@ -275,14 +276,14 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test) return true; } -bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { +int httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { bool parsedOk = httpParseChunkedBody(pContext, pParser, true); if (parsedOk) { httpParseChunkedBody(pContext, pParser, false); return HTTP_CHECK_BODY_SUCCESS; } else { httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); - if (!httpReadDataImp(pContext)) { + if (httpReadDataImp(pContext) != HTTP_READ_DATA_SUCCESS) { httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); return HTTP_CHECK_BODY_ERROR; } else { @@ -296,7 +297,6 @@ int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { if (dataReadLen > pParser->data.len) { httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d", pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); - httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); return HTTP_CHECK_BODY_ERROR; } else if (dataReadLen < pParser->data.len) { httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", @@ -358,20 +358,13 @@ bool httpParseRequest(HttpContext* pContext) { } int httpCheckReadCompleted(HttpContext* pContext) { - HttpParser *pParser = &pContext->parser; - if (pContext->httpChunked == HTTP_UNCUNKED) { - int ret = httpReadUnChunkedBody(pContext, pParser); - if (ret != HTTP_CHECK_BODY_SUCCESS) { - return ret; - } - } else { - int ret = httpReadChunkedBody(pContext, pParser); - if (ret != HTTP_CHECK_BODY_SUCCESS) { - return ret; - } - } + HttpParser* pParser = &pContext->parser; - return HTTP_CHECK_BODY_SUCCESS; + if (pContext->httpChunked == HTTP_UNCUNKED) { + return httpReadUnChunkedBody(pContext, pParser); + } else { + return httpReadChunkedBody(pContext, pParser); + } } bool httpDecodeRequest(HttpContext* pContext) { diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index dbe299cef7..177d447f10 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -69,7 +69,7 @@ void httpCleanUpConnect() { httpDebug("http server:%s is cleaned up", pServer->label); } -bool httpReadDataImp(HttpContext *pContext) { +int httpReadDataImp(HttpContext *pContext) { HttpParser *pParser = &pContext->parser; while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { @@ -85,8 +85,7 @@ bool httpReadDataImp(HttpContext *pContext) { } else { httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", pContext, pContext->fd, pContext->ipstr, errno); - httpReleaseContext(pContext); - return false; + return HTTP_READ_DATA_FAILED; } } else { pParser->bufsize += nread; @@ -95,15 +94,13 @@ bool httpReadDataImp(HttpContext *pContext) { if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE); - httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG); - httpNotifyContextClose(pContext); - return false; + return HTTP_REQUSET_TOO_BIG; } } pParser->buffer[pParser->bufsize] = 0; - return true; + return HTTP_READ_DATA_SUCCESS; } static bool httpDecompressData(HttpContext *pContext) { @@ -141,8 +138,14 @@ static bool httpReadData(HttpContext *pContext) { httpInitContext(pContext); } - if (!httpReadDataImp(pContext)) { - httpNotifyContextClose(pContext); + int32_t code = httpReadDataImp(pContext); + if (code != HTTP_READ_DATA_SUCCESS) { + if (code == HTTP_READ_DATA_FAILED) { + httpReleaseContext(pContext); + } else { + httpSendErrorResp(pContext, code); + httpNotifyContextClose(pContext); + } return false; } From 41d40592dc3798a8b9064dc93b4f1e5136a34f10 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 12 Jul 2020 14:21:02 +0800 Subject: [PATCH 07/14] add uid log while drop ctable --- src/mnode/src/mnodeTable.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 58eaecb20a..f4251e4f83 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -751,7 +751,9 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { mInfo("app:%p:%p, table:%s, start to drop stable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); return mnodeProcessDropSuperTableMsg(pMsg); } else { - mInfo("app:%p:%p, table:%s, start to drop ctable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); + SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable; + mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, + pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid); return mnodeProcessDropChildTableMsg(pMsg); } } @@ -1780,7 +1782,9 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); - mInfo("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); + mInfo("app:%p:%p, table:%s, send drop ctable msg, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, + pDrop->tableId, pTable->vgId, pTable->sid, pTable->uid); + SRpcMsg rpcMsg = { .ahandle = pMsg, .pCont = pDrop, @@ -2187,12 +2191,15 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; assert(pTable); - mInfo("app:%p:%p, table:%s, drop table rsp received, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, - pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); + + mInfo("app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%" PRIu64 ", thandle:%p result:%s", + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, + mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { - mError("app:%p:%p, table:%s, failed to drop in dnode, reason:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, - pTable->info.tableId, tstrerror(rpcMsg->code)); + mError("app:%p:%p, table:%s, failed to drop in dnode, vgId:%d sid:%d uid:%" PRIu64 ", reason:%s", + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, + tstrerror(rpcMsg->code)); dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); return; } From 9ead608f2cb3496212d07f80440c964aa30861f0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 12 Jul 2020 10:10:25 +0000 Subject: [PATCH 08/14] If the table is deleted by another thread during creation, stop creating and send drop msg to vnode --- src/mnode/inc/mnodeSdb.h | 1 + src/mnode/src/mnodeSdb.c | 8 ++++++++ src/mnode/src/mnodeTable.c | 17 ++++++++++++++--- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index eec6d45e23..0c47f684f8 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -94,6 +94,7 @@ void sdbDecRef(void *thandle, void *pRow); int64_t sdbGetNumOfRows(void *handle); int32_t sdbGetId(void *handle); uint64_t sdbGetVersion(); +bool sdbCheckRowDeleted(void *thandle, void *pRow); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index d3192a2460..c8e4ae956c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -661,6 +661,14 @@ int32_t sdbInsertRow(SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } +bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { + SSdbTable *pTable = pTableInput; + if (pTable == NULL) return false; + + int8_t *updateEnd = pRow + pTable->refCountPos - 1; + return (*updateEnd == 1); +} + int32_t sdbDeleteRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index f4251e4f83..8dac135885 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -72,7 +72,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessDropTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg); static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg); -static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg); +static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn); static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg); static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *mnodeMsg); @@ -754,7 +754,7 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable; mInfo("app:%p:%p, table:%s, start to drop ctable, vgId:%d sid:%d uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pCTable->vgId, pCTable->sid, pCTable->uid); - return mnodeProcessDropChildTableMsg(pMsg); + return mnodeProcessDropChildTableMsg(pMsg, true); } } @@ -1758,7 +1758,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { } } -static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { +static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId); if (pMsg->pVgroup == NULL) { @@ -1793,6 +1793,8 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; + if (!needReturn) rpcMsg.ahandle = NULL; + dnodeSendMsgToDnode(&ipSet, &rpcMsg); return TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -2246,6 +2248,15 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; assert(pTable); + // If the table is deleted by another thread during creation, stop creating and send drop msg to vnode + if (sdbCheckRowDeleted(tsChildTableSdb, pTable)) { + mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64, + mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); + mnodeProcessDropChildTableMsg(mnodeMsg, false); + dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + return; + } + if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont; if (pCreate->getMeta) { From 1111dc48cb5d9dfb5c5c64d9045f1d812bf083b0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 12 Jul 2020 18:43:16 +0800 Subject: [PATCH 09/14] return meta on table is already dropped --- src/mnode/src/mnodeTable.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 8dac135885..a3dc2b5adb 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -2253,8 +2253,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { mDebug("app:%p:%p, table:%s, create table rsp received, but a deleting opertion incoming, vgId:%d sid:%d uid:%" PRIu64, mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); mnodeProcessDropChildTableMsg(mnodeMsg, false); - dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); - return; + rpcMsg->code = TSDB_CODE_SUCCESS; } if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { From 749246deb8c8c53f90f2a2397c23e55b07d58635 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 12 Jul 2020 21:16:05 +0800 Subject: [PATCH 10/14] definite lost while drop table in tsdb --- src/mnode/src/mnodeTable.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index a3dc2b5adb..6a47313717 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -376,7 +376,7 @@ static void mnodeCleanupChildTables() { } static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { - pStable->numOfTables++; + atomic_add_fetch_32(&pStable->numOfTables, 1); if (pStable->vgHash == NULL) { pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); @@ -390,7 +390,7 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt } static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { - pStable->numOfTables--; + atomic_sub_fetch_32(&pStable->numOfTables, 1); if (pStable->vgHash == NULL) return; @@ -889,7 +889,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; - if (pStable->numOfTables != 0) { + if (pStable->vgHash != NULL /*pStable->numOfTables != 0*/) { SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash); while (taosHashIterNext(pIter)) { int32_t *pVgId = taosHashIterGet(pIter); From 6d5735030acd026582b299a42bf97fc32a4b92ae Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 12 Jul 2020 21:32:52 +0800 Subject: [PATCH 11/14] add some log --- src/mnode/src/mnodeTable.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 6a47313717..28661dc1ad 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -385,6 +385,8 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt if (pStable->vgHash != NULL) { if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) { taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); + mDebug("table:%s, vgId:%d is put into stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, + (int32_t)taosHashGetSize(pStable->vgHash)); } } } @@ -397,6 +399,8 @@ static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj * SVgObj *pVgroup = mnodeGetVgroup(pCtable->vgId); if (pVgroup == NULL) { taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId)); + mDebug("table:%s, vgId:%d is remove from stable vgList, sizeOfVgList:%d", pStable->info.tableId, pCtable->vgId, + (int32_t)taosHashGetSize(pStable->vgHash)); } mnodeDecVgroupRef(pVgroup); } @@ -748,7 +752,9 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { } if (pMsg->pTable->type == TSDB_SUPER_TABLE) { - mInfo("app:%p:%p, table:%s, start to drop stable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId); + SSuperTableObj *pSTable = (SSuperTableObj *)pMsg->pTable; + mInfo("app:%p:%p, table:%s, start to drop stable, uid:%" PRIu64 ", numOfChildTables:%d, sizeOfVgList:%d", + pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId, pSTable->uid, pSTable->numOfTables, (int32_t)taosHashGetSize(pSTable->vgHash)); return mnodeProcessDropSuperTableMsg(pMsg); } else { SChildTableObj *pCTable = (SChildTableObj *)pMsg->pTable; @@ -801,7 +807,7 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { assert(pTable); if (code == TSDB_CODE_SUCCESS) { - mLInfo("stable:%s, is created in sdb", pTable->info.tableId); + mLInfo("stable:%s, is created in sdb, uid:%" PRIu64, pTable->info.tableId, pTable->uid); } else { mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, tstrerror(code)); @@ -2118,7 +2124,7 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) { int32_t numOfTables = 0; SChildTableObj *pTable = NULL; - mInfo("stable:%s, all child tables(%d) will dropped from sdb", pStable->info.tableId, numOfTables); + mInfo("stable:%s, all child tables:%d will dropped from sdb", pStable->info.tableId, pStable->numOfTables); while (1) { pIter = mnodeGetNextChildTable(pIter, &pTable); From 53f7a132aa677956bb84eba379bda6307787e9a4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 12 Jul 2020 21:54:02 +0800 Subject: [PATCH 12/14] fix table memory leak temporarily --- src/inc/taoserror.h | 1 + src/tsdb/src/tsdbMeta.c | 41 ++++++++++++++++++++++++++++++++++------- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index d2bef9ea57..07b55b6acb 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -200,6 +200,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb inval TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "tsdb create table information") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 84c1c8e7d1..6d65e1e743 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -57,8 +57,30 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { STable * super = NULL; STable * table = NULL; int newSuper = 0; + int tid = pCfg->tableId.tid; + STable * pTable = NULL; - STable *pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid); + if (tid < 0 || tid >= pRepo->config.maxTables) { + tsdbError("vgId:%d failed to create table since invalid tid %d", REPO_ID(pRepo), tid); + terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO; + goto _err; + } + + if (pMeta->tables[tid] != NULL) { + if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) { + tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + TABLE_TID(pTable), TABLE_UID(pTable)); + return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; + } else { + tsdbError("vgId:%d table %s at tid %d uid %" PRIu64 + " exists, replace it with new table, this can be not reasonable", + REPO_ID(pRepo), TABLE_CHAR_NAME(pMeta->tables[tid]), TABLE_TID(pMeta->tables[tid]), + TABLE_UID(pMeta->tables[tid])); + tsdbDropTable(pRepo, pMeta->tables[tid]->tableId); + } + } + + pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid); if (pTable != NULL) { tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); @@ -72,10 +94,10 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { super = tsdbNewTable(pCfg, true); if (super == NULL) goto _err; } else { - // TODO - if (super->type != TSDB_SUPER_TABLE) return -1; - if (super->tableId.uid != pCfg->superUid) return -1; - // tsdbUpdateTable(pRepo, super, pCfg); + if (TABLE_TYPE(super) != TSDB_SUPER_TABLE || TABLE_UID(super) != pCfg->superUid) { + terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO; + goto _err; + } } } @@ -705,6 +727,9 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { T_REF_INC(pTable); + tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), + TABLE_UID(pTable)); + return pTable; _err: @@ -714,7 +739,9 @@ _err: static void tsdbFreeTable(STable *pTable) { if (pTable) { - if (pTable->name != NULL) tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable)); + if (pTable->name != NULL) + tsdbDebug("table %s tid %d uid %" PRIu64 " is destroyed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), + TABLE_UID(pTable)); tfree(TABLE_NAME(pTable)); if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) { @@ -782,7 +809,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo tsdbGetTableSchemaImpl(pTable, false, false, -1)); } - tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); return 0; From 89722e184280ae723e7deaf9876f7dbd8d2f427a Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 13 Jul 2020 09:52:37 +0800 Subject: [PATCH 13/14] fix td-902: memory leak --- src/query/src/qExecutor.c | 66 +++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ed7a86d843..1d9068be16 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5627,17 +5627,23 @@ static void freeQInfo(SQInfo *pQInfo); static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) { - SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); - if (pQInfo == NULL) { - return NULL; - } - - SQuery *pQuery = calloc(1, sizeof(SQuery)); - pQInfo->runtimeEnv.pQuery = pQuery; - int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; + SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); + if (pQInfo == NULL) { + goto _cleanup_qinfo; + } + // to make sure third party won't overwrite this structure + pQInfo->signature = pQInfo; + pQInfo->tableGroupInfo = *pTableGroupInfo; + + SQuery *pQuery = calloc(1, sizeof(SQuery)); + if (pQuery == NULL) { + goto _cleanup_query; + } + pQInfo->runtimeEnv.pQuery = pQuery; + pQuery->numOfCols = numOfCols; pQuery->numOfOutput = numOfOutput; pQuery->limit.limit = pQueryMsg->limit; @@ -5651,6 +5657,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQuery->fillType = pQueryMsg->fillType; pQuery->numOfTags = pQueryMsg->numOfTags; + pQuery->tagColList = pTagCols; // todo do not allocate ?? pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); @@ -5663,8 +5670,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters); } - pQuery->tagColList = pTagCols; - // calculate the result row size for (int16_t col = 0; col < numOfOutput; ++col) { assert(pExprs[col].bytes > 0); @@ -5709,10 +5714,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, memcpy(pQuery->fillVal, (char *)pQueryMsg->fillVal, pQuery->numOfOutput * sizeof(int64_t)); } - // to make sure third party won't overwrite this structure - pQInfo->signature = pQInfo; - - pQInfo->tableGroupInfo = *pTableGroupInfo; size_t numOfGroups = 0; if (pTableGroupInfo->pGroupList != NULL) { numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); @@ -5775,6 +5776,21 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; +_cleanup_qinfo: + tsdbDestoryTableGroup(pTableGroupInfo); + +_cleanup_query: + taosArrayDestroy(pGroupbyExpr->columnInfo); + tfree(pGroupbyExpr); + tfree(pTagCols); + for (int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pExprInfo = &pExprs[i]; + if (pExprInfo->pExpr != NULL) { + tExprTreeDestroy(&pExprInfo->pExpr, NULL); + } + } + tfree(pExprs); + _cleanup: freeQInfo(pQInfo); return NULL; @@ -5893,19 +5909,21 @@ static void freeQInfo(SQInfo *pQInfo) { } // todo refactor, extract method to destroytableDataInfo - int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); - for (int32_t i = 0; i < numOfGroups; ++i) { - SArray *p = GET_TABLEGROUP(pQInfo, i); + if (pQInfo->tableqinfoGroupInfo.pGroupList != NULL) { + int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); + for (int32_t i = 0; i < numOfGroups; ++i) { + SArray *p = GET_TABLEGROUP(pQInfo, i); - size_t num = taosArrayGetSize(p); - for(int32_t j = 0; j < num; ++j) { - STableQueryInfo* item = taosArrayGetP(p, j); - if (item != NULL) { - destroyTableQueryInfo(item, pQuery->numOfOutput); + size_t num = taosArrayGetSize(p); + for(int32_t j = 0; j < num; ++j) { + STableQueryInfo* item = taosArrayGetP(p, j); + if (item != NULL) { + destroyTableQueryInfo(item, pQuery->numOfOutput); + } } - } - taosArrayDestroy(p); + taosArrayDestroy(p); + } } tfree(pQInfo->pBuf); From 3c707138d738b073ec7bd17de192bd66b052190d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 13 Jul 2020 14:14:58 +0800 Subject: [PATCH 14/14] add some log --- src/dnode/src/dnodeVWrite.c | 3 ++- src/vnode/src/vnodeWrite.c | 2 ++ tests/script/sh/deploy.sh | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 546e8cecb9..4a69b525ea 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -232,8 +232,9 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; - dDebug("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); + dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); } else { + dDebug("%p, wal msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); pHead = (SWalHead *)item; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 5ed5e747f2..09e4b43ed3 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -184,6 +184,8 @@ int vnodeWriteToQueue(void *param, void *data, int type) { memcpy(pWal, pHead, size); atomic_add_fetch_32(&pVnode->refCount, 1); + vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + taosWriteQitem(pVnode->wqueue, type, pWal); return 0; diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 8f9569e09d..5573db0739 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -110,7 +110,7 @@ echo "second ${HOSTNAME}:7200" >> $TAOS_CFG echo "serverPort ${NODE}" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG -echo "debugFlag 135" >> $TAOS_CFG +echo "debugFlag 131" >> $TAOS_CFG echo "mDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 135" >> $TAOS_CFG