From 0eb138cd1d7cd191f1ab4ad89b0a96963d2bc3b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 15:04:05 +0800 Subject: [PATCH 01/38] [td-1583] --- src/client/src/tscSQLParser.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b9d4cc13d1..0d75e60c28 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6205,6 +6205,10 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar; + if (pTableItem1->nType != TSDB_DATA_TYPE_BINARY) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11); + } + SStrToken aliasName = {.z = pTableItem1->pz, .n = pTableItem1->nLen, .type = TK_STRING}; if (tscValidateName(&aliasName) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11); From 535816bc5aeea021ec11f66970733ff86c0ccf4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 17:45:57 +0800 Subject: [PATCH 02/38] [td-1596] --- src/os/src/detail/osTime.c | 2 +- src/query/src/qExecutor.c | 32 ++++++++++++----- tests/script/general/parser/topbot.sim | 49 ++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index bd4dc24554..b78627f46f 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -481,7 +481,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); } else { int64_t delta = t - pInterval->interval; - int32_t factor = delta > 0 ? 1 : -1; + int32_t factor = (delta >= 0) ? 1 : -1; start = (delta / pInterval->sliding + factor) * pInterval->sliding; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f2d324e376..84a5e25ab7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2225,10 +2225,11 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { return false; } -int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { +int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { SQuery *pQuery = pRuntimeEnv->pQuery; - *status = 0; + *status = BLK_DATA_NO_NEEDED; + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) { *status = BLK_DATA_ALL_NEEDED; } else { // check if this data block is required to load @@ -2240,12 +2241,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, } if ((*status) != BLK_DATA_ALL_NEEDED) { + // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, + // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + bool hasTimeWindow = false; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + + TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; + + STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) != + TSDB_CODE_SUCCESS) { + // todo handle error in set result for timewindow + } + } + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; int32_t functionId = pSqlFunc->functionId; int32_t colId = pSqlFunc->colInfo.colId; - (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; @@ -2476,7 +2491,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SArray * pDataBlock = NULL; uint32_t status = 0; - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); if (ret != TSDB_CODE_SUCCESS) { break; } @@ -4667,18 +4682,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } - SDataStatis *pStatis = NULL; - SArray * pDataBlock = NULL; uint32_t status = 0; + SDataStatis *pStatis = NULL; + SArray *pDataBlock = NULL; - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); if (ret != TSDB_CODE_SUCCESS) { break; } if (status == BLK_DATA_DISCARD) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step; continue; } diff --git a/tests/script/general/parser/topbot.sim b/tests/script/general/parser/topbot.sim index 5c575b6163..c2b41888d7 100644 --- a/tests/script/general/parser/topbot.sim +++ b/tests/script/general/parser/topbot.sim @@ -213,4 +213,53 @@ if $data01 != 5195.000000000 then return -1 endi +print =======================>td-1596 +sql create table t2(ts timestamp, k int) +sql insert into t2 values('2020-1-2 1:1:1', 1); +sql insert into t2 values('2020-2-2 1:1:1', 1); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start +sql connect +sleep 1000 + +sql use db +sql select count(*), first(ts), last(ts) from t2 interval(1d); +if $rows != 2 then + return -1 +endi + +if $data00 != @20-01-02 00:00:00.000@ then + print expect 20-01-02 00:00:00.000, actual: $data00 + return -1 +endi + +if $data10 != @20-02-02 00:00:00.000@ then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +if $data02 != @20-01-02 01:01:01.000@ then + return -1 +endi + +if $data12 != @20-02-02 01:01:01.000@ then + return -1 +endi + +if $data03 != @20-01-02 01:01:01.000@ then + return -1 +endi + +if $data13 != @20-02-02 01:01:01.000@ then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From 2b909bc151775cc56cfe683c0da160433e608b20 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 18:54:12 +0800 Subject: [PATCH 03/38] [td-1598] --- src/client/src/tscSQLParser.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 0d75e60c28..bb20f09c50 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -827,12 +827,12 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa SStrToken t = {0}; getCurrentDBName(pSql, &t); if (t.n == 0) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - - code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); - if (code != 0) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } else { + code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); + if (code != 0) { + invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } } } From 901803abe64cf9b222733c9b9a400333d8804e80 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 14:11:38 +0800 Subject: [PATCH 04/38] [td-1604] --- src/client/src/tscAsync.c | 2 +- src/client/src/tscParseInsert.c | 44 ++++++++++++++++----------------- src/client/src/tscPrepare.c | 6 ----- src/inc/taoserror.h | 2 +- 4 files changed, 23 insertions(+), 31 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c5d622e245..400613595b 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -509,7 +509,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) { + if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6fd97c09e9..16ff0139b4 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -406,7 +406,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start return TSDB_CODE_SUCCESS; } -int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error, +int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, SSqlCmd* pCmd, int16_t timePrec, int32_t *code, char *tmpTokenBuf) { int32_t index = 0; SStrToken sToken = {0}; @@ -426,12 +426,17 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ *str += index; if (sToken.type == TK_QUESTION) { + if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { + *code = tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str); + return -1; + } + uint32_t offset = (uint32_t)(start - pDataBlocks->pData); if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { continue; } - strcpy(error, "client out of memory"); + strcpy(pCmd->payload, "client out of memory"); *code = TSDB_CODE_TSC_OUT_OF_MEMORY; return -1; } @@ -439,8 +444,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ int16_t type = sToken.type; if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) { - tscSQLSyntaxErrMsg(error, "invalid data or symbol", sToken.z); - *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + *code = tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z); return -1; } @@ -470,14 +474,14 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ } bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); - int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec); + int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec); if (ret != TSDB_CODE_SUCCESS) { *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return -1; // NOTE: here 0 mean error! } if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { - tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z); + tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z); *code = TSDB_CODE_TSC_INVALID_TIME_STAMP; return -1; } @@ -522,7 +526,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { } int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows, - SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) { + SParsedDataColInfo *spd, SSqlCmd* pCmd, int32_t *code, char *tmpTokenBuf) { int32_t index = 0; SStrToken sToken; @@ -534,8 +538,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe int32_t precision = tinfo.precision; if (spd->hasVal[0] == false) { - strcpy(error, "primary timestamp column can not be null"); - *code = TSDB_CODE_TSC_INVALID_SQL; + *code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", *str); return -1; } @@ -547,17 +550,17 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe *str += index; if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) { int32_t tSize; - int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize); - if (retcode != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client - strcpy(error, "client out of memory"); - *code = retcode; + *code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize); + if (*code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client + strcpy(pCmd->payload, "client out of memory"); return -1; } + ASSERT(tSize > maxRows); maxRows = tSize; } - int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf); + int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, pCmd, precision, code, tmpTokenBuf); if (len <= 0) { // error message has been set in tsParseOneRowData return -1; } @@ -568,7 +571,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe sToken = tStrGetToken(*str, &index, false, 0, NULL); *str += index; if (sToken.n == 0 || sToken.type != TK_RP) { - tscSQLSyntaxErrMsg(error, ") expected", *str); + tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str); *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return -1; } @@ -577,7 +580,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe } if (numOfRows <= 0) { - strcpy(error, "no any data points"); + strcpy(pCmd->payload, "no any data points"); *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return -1; } else { @@ -704,7 +707,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf); + int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd, &code, tmpTokenBuf); free(tmpTokenBuf); if (numOfRows <= 0) { return code; @@ -724,10 +727,6 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st dataBuf->vgId = pTableMeta->vgroupInfo.vgId; dataBuf->numOfTables = 1; - /* - * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS, - * which is actually returned from server. - */ *totalNum += numOfRows; return TSDB_CODE_SUCCESS; } @@ -1459,8 +1458,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { char *lineptr = line; strtolower(line, line); - int32_t len = - tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf); + int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd, tinfo.precision, &code, tokenBuf); if (len <= 0 || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; break; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index cdbd8685df..b3bb4566be 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -43,10 +43,6 @@ typedef struct SNormalStmt { tVariant* params; } SNormalStmt; -//typedef struct SInsertStmt { -// -//} SInsertStmt; - typedef struct STscStmt { bool isInsert; STscObj* taos; @@ -54,7 +50,6 @@ typedef struct STscStmt { SNormalStmt normal; } STscStmt; - static int normalStmtAddPart(SNormalStmt* stmt, bool isParam, char* str, uint32_t len) { uint16_t size = stmt->numParts + 1; if (size > stmt->sizeParts) { @@ -514,7 +509,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { SSqlObj* pSql = pStmt->pSql; size_t sqlLen = strlen(sql); - //doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; pSql->param = (void*) pSql; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 67e2d43c98..6987647fe4 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -98,7 +98,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax errr in SQL") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") From 8195149c72e084c3c5353893e0fd08374c26ed2c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 14:12:39 +0800 Subject: [PATCH 05/38] [td-225]. --- src/client/src/tscServer.c | 4 ++-- src/client/src/tscSql.c | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3cbb0d936e..ac4bc5b066 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -259,8 +259,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } - pSql->pRpcCtx = NULL; // clear the rpcCtx - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", @@ -389,6 +387,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } rpcFreeCont(rpcMsg->pCont); + } int doProcessSql(SSqlObj *pSql) { @@ -475,6 +474,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSub->pRpcCtx != NULL) { rpcCancelRequest(pSub->pRpcCtx); + pSub->pRpcCtx = NULL; } tscQueueAsyncRes(pSub); // async res? not other functions? diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 430a762321..acc93530e3 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -698,6 +698,7 @@ void taos_stop_query(TAOS_RES *res) { tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { + assert(pSql->pRpcCtx != NULL); rpcCancelRequest(pSql->pRpcCtx); } } From df32088b5dc8353d36c12ad855988c92c3d00f31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 14:30:08 +0800 Subject: [PATCH 06/38] [td-1604]. --- tests/script/general/parser/where.sim | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index f9fd919bd6..bfbeddf93a 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -234,6 +234,13 @@ if $data11 != @19-01-01 09:10:00.000@ then endi sql create table tb_where_NULL (ts timestamp, c1 float, c2 binary(10)) + +print ===================>td-1604 +sql_error insert into tb_where_NULL values(?, ?, ?) +sql_error insert into tb_where_NULL values(now, 1, ?) +sql_error insert into tb_where_NULL values(?, 1, '') +sql_error insert into tb_where_NULL values(now, ?, '12') + sql insert into tb_where_NULL values ('2019-01-01 09:00:00.000', 1, 'val1') sql insert into tb_where_NULL values ('2019-01-01 09:00:01.000', NULL, NULL) sql insert into tb_where_NULL values ('2019-01-01 09:00:02.000', 2, 'val2') @@ -330,4 +337,5 @@ if $rows != 0 then return -1 endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From 0dcef5b340d48e7aa977c1914edd5bff52b5d3fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:04:47 +0800 Subject: [PATCH 07/38] [td-1603] --- src/client/src/tscSQLParser.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index bb20f09c50..2c26275262 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -817,17 +817,16 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa oldName = strdup(pTableMetaInfo->name); } - if (hasSpecifyDB(pzTableName)) { - // db has been specified in sql string so we ignore current db path + if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); if (code != 0) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - } else { // get current DB name first, then set it into path + } else { // get current DB name first, and then set it into path SStrToken t = {0}; getCurrentDBName(pSql, &t); - if (t.n == 0) { - code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + if (t.n == 0) { // current database not available or not specified + code = TSDB_CODE_MND_DB_NOT_SELECTED; } else { code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); if (code != 0) { From b2dd094ffaf01f9b8a6596db4e12f5ae65adb1d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:13:33 +0800 Subject: [PATCH 08/38] [td-1603] --- src/client/src/tscSQLParser.c | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2c26275262..c51ea148f3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -805,17 +805,13 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) { const char* msg1 = "name too long"; - const char* msg2 = "current database or database name invalid"; SSqlCmd* pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; // backup the old name in pTableMetaInfo - size_t size = strlen(pTableMetaInfo->name); - char* oldName = NULL; - if (size > 0) { - oldName = strdup(pTableMetaInfo->name); - } + char oldName[TSDB_TABLE_FNAME_LEN] = {0}; + tstrncpy(oldName, pTableMetaInfo->name, tListLen(oldName)); if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); @@ -836,23 +832,17 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa } if (code != TSDB_CODE_SUCCESS) { - taosTFree(oldName); return code; } /* - * the old name exists and is not equalled to the new name. Release the metermeta/metricmeta + * the old name exists and is not equalled to the new name. Release the table meta * that are corresponding to the old name for the new table name. */ - if (size > 0) { - if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { - tscClearTableMetaInfo(pTableMetaInfo, false); - } - } else { - assert(pTableMetaInfo->pTableMeta == NULL); + if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { + tscClearTableMetaInfo(pTableMetaInfo, false); } - taosTFree(oldName); return TSDB_CODE_SUCCESS; } From 9cb7efeec1129ba90e090080f1da459c31f1062f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:41:14 +0800 Subject: [PATCH 09/38] [td-1603] --- src/client/src/tscSQLParser.c | 66 +++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c51ea148f3..b0a68d1bb7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -233,8 +233,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } else if (pInfo->type == TSDB_SQL_DROP_TABLE) { assert(pInfo->pDCLInfo->nTokens == 1); - code = tscSetTableFullName(pTableMetaInfo, pzName, pSql); - if(code != TSDB_CODE_SUCCESS) { + if((code = tscSetTableFullName(pTableMetaInfo, pzName, pSql)) != TSDB_CODE_SUCCESS) { return code; } } else if (pInfo->type == TSDB_SQL_DROP_DNODE) { @@ -362,8 +361,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } // additional msg has been attached already - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } return tscGetTableMeta(pSql, pTableMetaInfo); @@ -381,14 +381,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + code = tscSetTableFullName(pTableMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } + return tscGetTableMeta(pSql, pTableMetaInfo); } case TSDB_SQL_SHOW_CREATE_DATABASE: { const char* msg1 = "invalid database name"; - const char* msg2 = "table name is too long"; SStrToken* pToken = &pInfo->pDCLInfo->a[0]; if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { @@ -397,11 +398,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pToken->n > TSDB_DB_NAME_LEN) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - return TSDB_CODE_SUCCESS; - } + + return tscSetTableFullName(pTableMetaInfo, pToken, pSql); + } case TSDB_SQL_CFG_DNODE: { const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 "; const char* msg3 = "invalid dnode ep"; @@ -4555,6 +4554,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg18 = "primary timestamp column cannot be dropped"; const char* msg19 = "invalid new tag name"; + int32_t code = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); @@ -4565,13 +4566,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo); - if (ret != TSDB_CODE_SUCCESS) { - return ret; + code = tscGetTableMeta(pSql, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -5869,8 +5871,9 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + int32_t code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + if(code != TSDB_CODE_SUCCESS) { + return code; } if (!validateTableColumnInfo(pFieldList, pCmd) || @@ -5924,15 +5927,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + int32_t code = tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } // get meter meta from mnode tstrncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, sizeof(pCreateTable->usingInfo.tagdata.name)); tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals; - int32_t code = tscGetTableMeta(pSql, pStableMeterMetaInfo); + code = tscGetTableMeta(pSql, pStableMeterMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6009,7 +6013,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg1 = "invalid table name"; - const char* msg2 = "table name too long"; const char* msg3 = "fill only available for interval query"; const char* msg4 = "fill option not supported in stream computing"; const char* msg5 = "sql too long"; // todo ADD support @@ -6041,11 +6044,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + int32_t code = tscSetTableFullName(pTableMetaInfo, &srcToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t code = tscGetTableMeta(pSql, pTableMetaInfo); + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6072,8 +6076,9 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } // set the created table[stream] name - if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } if (pQuerySql->selectToken.n > TSDB_MAX_SAVED_SQL_LEN) { @@ -6189,8 +6194,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2); SStrToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz}; - if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo1, &t, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar; From 284e159361b809521898789e28001cc4bcf53881 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:53:43 +0800 Subject: [PATCH 10/38] [td-1603] --- src/client/src/tscSQLParser.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b0a68d1bb7..30a25a0285 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -838,7 +838,7 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa * the old name exists and is not equalled to the new name. Release the table meta * that are corresponding to the old name for the new table name. */ - if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { + if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { tscClearTableMetaInfo(pTableMetaInfo, false); } @@ -6122,7 +6122,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0)); const char* msg0 = "invalid table name"; - //const char* msg1 = "table name too long"; const char* msg2 = "point interpolation query needs timestamp"; const char* msg5 = "fill only available for interval query"; const char* msg6 = "start(end) time of query range required or time range too large"; From 100121251f28267d3d72cc02a77329fe6037dd4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Sep 2020 15:11:46 +0800 Subject: [PATCH 11/38] [td-1619] --- src/tsdb/src/tsdbRead.c | 42 +++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a3bc0de272..d3f4747a96 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -697,22 +697,41 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); if (pCheckInfo->pDataCols == NULL) { - tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo); + tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return terrno; + goto _error; } } STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); - tdInitDataCols(pCheckInfo->pDataCols, pSchema); - tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); - tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); + int32_t code = tdInitDataCols(pCheckInfo->pDataCols, pSchema); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _error; + } + + code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _error; + } + + code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _error; + } int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle))); if (ret != TSDB_CODE_SUCCESS) { - return terrno; + int32_t c = terrno; + assert(c != TSDB_CODE_SUCCESS); + goto _error; } SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; @@ -729,10 +748,16 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p int64_t elapsedTime = (taosGetTimestampUs() - st); pQueryHandle->cost.blockLoadTime += elapsedTime; - tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p", + tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %p", pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo); - return TSDB_CODE_SUCCESS; + +_error: + pBlock->numOfRows = 0; + + tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %p", + pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pQueryHandle->qinfo); + return terrno; } static int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo); @@ -1241,6 +1266,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->pos >= 0 && cur->pos < pBlock->numOfRows); TSKEY* tsArray = pCols->cols[0].pData; + assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); // for search the endPos, so the order needs to reverse int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; From c63ec824f3ef5d05ea187ddcd5ed05b994e6203f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Sep 2020 15:44:02 +0800 Subject: [PATCH 12/38] [td-1603] add new error code at client side --- src/client/src/tscLocal.c | 8 ++++---- src/client/src/tscSQLParser.c | 2 +- src/inc/taoserror.h | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 030b033653..590c2829d0 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -430,7 +430,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { SSqlObj* pSql = builder->pInterSql; if (row == NULL) { - return TSDB_CODE_MND_INVALID_TABLE_NAME; + return TSDB_CODE_TSC_INVALID_TABLE_NAME; } int32_t* lengths = taos_fetch_lengths(pSql); @@ -458,7 +458,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { } if (0 == strlen(result)) { - return TSDB_CODE_MND_INVALID_TABLE_NAME; + return TSDB_CODE_TSC_INVALID_TABLE_NAME; } return TSDB_CODE_SUCCESS; } @@ -554,7 +554,7 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) { static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { TAOS_ROW row = tscFetchRow(builder); if (row == NULL) { - return TSDB_CODE_MND_DB_NOT_SELECTED; + return TSDB_CODE_TSC_DB_NOT_SELECTED; } const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL}; @@ -586,7 +586,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { } while (row != NULL); if (0 == strlen(result)) { - return TSDB_CODE_MND_DB_NOT_SELECTED; + return TSDB_CODE_TSC_DB_NOT_SELECTED; } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 30a25a0285..66e239074d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -821,7 +821,7 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa SStrToken t = {0}; getCurrentDBName(pSql, &t); if (t.n == 0) { // current database not available or not specified - code = TSDB_CODE_MND_DB_NOT_SELECTED; + code = TSDB_CODE_TSC_DB_NOT_SELECTED; } else { code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); if (code != 0) { diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 6987647fe4..781b5897a1 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -99,6 +99,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnect TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, 0, 0x0217, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, 0, 0x0218, "Table does not exist") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") From 26e708bc0b2dca0a3819b4a78124da9f52cd5322 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Sep 2020 10:17:53 +0800 Subject: [PATCH 13/38] [td-1300] --- src/client/src/tscServer.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ac4bc5b066..74acc56bba 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -53,7 +53,10 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); SRpcEpSet* pEpSet = &pSql->epSet; - pEpSet->inUse = 0; + + // Issue the query to one of the vnode among a vgroup randomly. + // change the inUse property would not affect the isUse attribute of STableMeta + pEpSet->inUse = rand() % pVgroupInfo->numOfEps; // apply the FQDN string length check here bool hasFqdn = false; From db66a5da57d6f013e6fb5b60182129bf255cea5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Sep 2020 22:57:06 +0800 Subject: [PATCH 14/38] [td-225] --- src/client/inc/tsclient.h | 1 - src/os/inc/osTime.h | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ea42b0f6a3..ea49cabc4e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -229,7 +229,6 @@ typedef struct SQueryInfo { // TODO refactor STimeWindow window; // query time window SInterval interval; - int32_t tz; // query client timezone SSqlGroupbyExpr groupbyExpr; // group by tags info SArray * colList; // SArray diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index 99f7586f72..6b209219c6 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -63,9 +63,10 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { typedef struct SInterval { - char intervalUnit; - char slidingUnit; - char offsetUnit; + int32_t tz; // query client timezone + char intervalUnit; + char slidingUnit; + char offsetUnit; int64_t interval; int64_t sliding; int64_t offset; From 2d5f36851a858674c46e529ef251673407a235b7 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 30 Sep 2020 01:21:22 +0000 Subject: [PATCH 15/38] TD-1632 --- src/rpc/src/rpcMain.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f0b8c996c5..6a5d3b079a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,13 +885,14 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); + *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -945,6 +946,17 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); + if (terrno == 0) { + SRpcReqContext *pContext = pConn->pContext; + *ppContext = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; + + // for UDP, port may be changed by server, the port in epSet shall be used for cache + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } + } } } @@ -1009,7 +1021,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - pConn = rpcProcessMsgHead(pRpc, pRecv); + SRpcReqContext *pContext; + pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1029,7 +1042,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead); + rpcProcessIncomingMsg(pConn, pHead, pContext); } } @@ -1060,7 +1073,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1089,15 +1102,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } } else { // it's a response - SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } else { + if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { rpcCloseConn(pConn); } From a3a95b60c3fbfd9aecbfbb32be079ed8a6010028 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Sep 2020 21:58:05 +0800 Subject: [PATCH 16/38] [td-1603] add client side defined error code. --- tests/pytest/crash_gen.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 1ea19dfac3..fee355eef9 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1516,6 +1516,8 @@ class Task(): if errno in [ 0x05, # TSDB_CODE_RPC_NOT_READY # 0x200, # invalid SQL, TODO: re-examine with TD-934 + 0x217, # "db not selected", client side defined error code + 0x218, # "Table does not exist" client side defined error code 0x360, 0x362, 0x369, # tag already exists 0x36A, 0x36B, 0x36D, From 1f0f57cf383ffa0050c58d18f05e658ede4fa72e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Sep 2020 23:50:46 +0800 Subject: [PATCH 17/38] [td-1283] --- src/client/src/tscServer.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ad13502e57..abef727040 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -147,12 +147,13 @@ void tscPrintMgmtEp() { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; if (pObj == NULL) return; + if (pObj != pObj->signature) { tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); return; } - SSqlObj *pSql = pObj->pHb; + SSqlObj *pSql = tres; SSqlRes *pRes = &pSql->res; if (code == 0) { @@ -173,10 +174,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("heart beat failed, code:%s", tstrerror(code)); + tscDebug("heartbeat failed, code:%s", tstrerror(code)); } - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + if (pObj->pHb != NULL) { + int64_t waitingDuring = tsShellActivityTimer * 500; + tscDebug("%p start heartbeat in %"PRId64"ms", pSql, waitingDuring); + + taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); + } else { + tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); + } } void tscProcessActivityTimer(void *handle, void *tmrId) { From 7bf8c4489d8f71784bd30894acade1d451af7a26 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Oct 2020 15:36:13 +0800 Subject: [PATCH 18/38] [td-225] fix invalid read of hb --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index abef727040..8a645d5761 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -261,6 +261,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pSql->cmd; assert(*pSql->self == pSql); + pSql->pRpcCtx = NULL; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); @@ -398,7 +399,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } rpcFreeCont(rpcMsg->pCont); - } int doProcessSql(SSqlObj *pSql) { From 0e5cc6e46ad71bd94577bf9fe9aae9fc66c49b0b Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:40:56 +0000 Subject: [PATCH 19/38] TD-1645 --- src/rpc/src/rpcTcp.c | 51 +++++++++++++++++++++++++++++--------------- src/rpc/src/rpcUdp.c | 12 ++++++++--- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2a3facdb36..97a3dad1eb 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -62,7 +62,7 @@ typedef struct { char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; - SThreadObj *pThreadObj; + SThreadObj **pThreadObj; pthread_t thread; } SServerObj; @@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); + pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); @@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); // initialize parameters in case it may encounter error later - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pServerObj->pThreadObj); + free(pServerObj); + return NULL; + } + + pServerObj->pThreadObj[i] = pThreadObj; pThreadObj->pollFd = -1; taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; - pThreadObj++; } // initialize mutex, thread, fd which may fail - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = pServerObj->pThreadObj[i]; code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); @@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pThreadObj->threadId = i; - pThreadObj++; } pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); @@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; + if (pThreadObj->thread == pthread_self()) { + pthread_detach(pthread_self()); + return; + } + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed @@ -184,14 +197,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); - if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); if (fd != -1) taosCloseSocket(fd); - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); - } } void taosStopTcpServer(void *handle) { @@ -210,7 +216,7 @@ void taosCleanUpTcpServer(void *handle) { if (pServerObj == NULL) return; for (int i = 0; i < pServerObj->numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj + i; + pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); pthread_mutex_destroy(&(pThreadObj->mutex)); } @@ -249,7 +255,7 @@ static void *taosAcceptTcpConnection(void *arg) { taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; + pThreadObj = pServerObj->pThreadObj[threadId]; SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); if (pFdObj) { @@ -329,8 +335,6 @@ void taosCleanUpTcpClient(void *chandle) { taosStopTcpThread(pThreadObj); tDebug ("%s TCP client is cleaned up", pThreadObj->label); - - taosTFree(pThreadObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -503,8 +507,21 @@ static void *taosProcessTcpData(void *param) { pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } + + if (pThreadObj->stop) break; } + if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); + } + + tDebug("%s TCP thread exits ...", pThreadObj->label); + taosTFree(pThreadObj); + return NULL; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4ea47582b9..250c4c38f2 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -140,15 +140,18 @@ void taosStopUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); + pConn->fd = -1; } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - pthread_join(pConn->thread, NULL); + if (pConn->thread == pthread_self()) { + pthread_detach(pthread_self()); + } else { + pthread_join(pConn->thread, NULL); + } } - taosTFree(pConn->buffer); - // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } tDebug("%s UDP is stopped", pSet->label); @@ -230,6 +233,9 @@ static void *taosRecvUdpData(void *param) { (*(pConn->processData))(&recvInfo); } + taosTFree(pConn->buffer); + tDebug("%s UDP recv thread exits", pConn->label); + return NULL; } From 30284cbe59f37a234c8ad61fe7c16f49c1648bf8 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:48:10 +0000 Subject: [PATCH 20/38] TD-1645 --- src/sync/src/taosTcpPool.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index eda822b1ec..539cfb64da 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { continue; } } + } + + if (pThread->stop) break; } uDebug("%p TCP epoll thread exits", pThread); From f949b732f5051ad95893c5ea994907362e9dcb5f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:55:00 +0000 Subject: [PATCH 21/38] TD-1645 --- src/rpc/src/rpcTcp.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 97a3dad1eb..2119824b3d 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -205,7 +205,14 @@ void taosStopTcpServer(void *handle) { if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); - if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL); + + if (taosCheckPthreadValid(pServerObj->thread)) { + if (pServerObj->thread == pthread_self()) { + pthread_detach(pthread_self()); + } else { + pthread_join(pServerObj->thread, NULL); + } + } tDebug("%s TCP server is stopped", pServerObj->label); } From ccc32feb221e237b84af621ebe07b665f9e7a43f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 08:44:33 +0800 Subject: [PATCH 22/38] revert from TD-1632 --- src/rpc/src/rpcMain.c | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a5d3b079a..f0b8c996c5 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,14 +885,13 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); - *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -946,17 +945,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); - if (terrno == 0) { - SRpcReqContext *pContext = pConn->pContext; - *ppContext = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } - } } } @@ -1021,8 +1009,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - SRpcReqContext *pContext; - pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); + pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1042,7 +1029,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead, pContext); + rpcProcessIncomingMsg(pConn, pHead); } } @@ -1073,7 +1060,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1102,10 +1089,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } } else { // it's a response + SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } else { rpcCloseConn(pConn); } From 8fed6ecaa12940f2ea840693b582d9a6ffe735a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Oct 2020 11:11:07 +0800 Subject: [PATCH 23/38] [td-225] fix compiler error on windows --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8a645d5761..e84217b38b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -178,7 +178,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { } if (pObj->pHb != NULL) { - int64_t waitingDuring = tsShellActivityTimer * 500; + int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p start heartbeat in %"PRId64"ms", pSql, waitingDuring); taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); From ef1dc64667eb711961d43a24bec0ae2c22b547f7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Oct 2020 11:12:46 +0800 Subject: [PATCH 24/38] [td-225] fix compiler error on windows --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ad677c2194..d3c0201169 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -179,7 +179,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pObj->pHb != NULL) { int32_t waitingDuring = tsShellActivityTimer * 500; - tscDebug("%p start heartbeat in %"PRId64"ms", pSql, waitingDuring); + tscDebug("%p start heartbeat in %dms", pSql, waitingDuring); taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); } else { From baf580ee8b21846676360b5feeca9a53cba0e75e Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 2 Oct 2020 12:59:15 +0800 Subject: [PATCH 25/38] TD-1632 revert --- src/rpc/src/rpcMain.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f0b8c996c5..6a5d3b079a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,13 +885,14 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); + *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -945,6 +946,17 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); + if (terrno == 0) { + SRpcReqContext *pContext = pConn->pContext; + *ppContext = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; + + // for UDP, port may be changed by server, the port in epSet shall be used for cache + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } + } } } @@ -1009,7 +1021,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - pConn = rpcProcessMsgHead(pRpc, pRecv); + SRpcReqContext *pContext; + pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1029,7 +1042,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead); + rpcProcessIncomingMsg(pConn, pHead, pContext); } } @@ -1060,7 +1073,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1089,15 +1102,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } } else { // it's a response - SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } else { + if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { rpcCloseConn(pConn); } From 38dc4f4d161f23653a2e69b199637a6f8f4723c9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 13:31:15 +0800 Subject: [PATCH 26/38] TD-1645 fix compile error in windotws --- src/rpc/src/rpcTcp.c | 4 ++-- src/rpc/src/rpcUdp.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2119824b3d..aacdede543 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -174,7 +174,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; - if (pThreadObj->thread == pthread_self()) { + if (taosComparePthread(pThreadObj->thread, pthread_self())) { pthread_detach(pthread_self()); return; } @@ -207,7 +207,7 @@ void taosStopTcpServer(void *handle) { if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if (taosCheckPthreadValid(pServerObj->thread)) { - if (pServerObj->thread == pthread_self()) { + if (taosComparePthread(pServerObj->thread, pthread_self())) { pthread_detach(pthread_self()); } else { pthread_join(pServerObj->thread, NULL); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 250c4c38f2..4fd0318ae6 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -146,7 +146,7 @@ void taosStopUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - if (pConn->thread == pthread_self()) { + if (taosComparePthread(pConn->thread, pthread_self())) { pthread_detach(pthread_self()); } else { pthread_join(pConn->thread, NULL); From c522e902fae8a7afe6d328731a008cab57fb04da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 05:40:37 +0000 Subject: [PATCH 27/38] scripts --- tests/script/general/http/restful_full.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/general/http/restful_full.sim b/tests/script/general/http/restful_full.sim index a02140a419..8d2f1a7c00 100644 --- a/tests/script/general/http/restful_full.sim +++ b/tests/script/general/http/restful_full.sim @@ -119,7 +119,7 @@ endi system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql print 17-> $system_content -if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then +if $system_content != @{"status":"error","code":534,"desc":"Syntax error in SQL"}@ then return -1 endi From a8b8ecb20e38e23a59b99924e0c701adda458084 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 2 Oct 2020 14:56:54 +0000 Subject: [PATCH 28/38] TD-1645 --- src/connector/go | 2 +- src/rpc/src/rpcTcp.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/go b/src/connector/go index 567b7b12f3..8c58c512b6 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index aacdede543..46555b3647 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -196,7 +196,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) pthread_join(pThreadObj->thread, NULL); if (fd != -1) taosCloseSocket(fd); } From 09d2c5c31b9816b9f3a70b5e69024a9fb5abba60 Mon Sep 17 00:00:00 2001 From: Bison 'goldeagle' Fan Date: Sat, 3 Oct 2020 10:57:24 +0800 Subject: [PATCH 29/38] upd: README.md with submodule contents of go & grafana connectors --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 522fc0ebc1..36436dd549 100644 --- a/README.md +++ b/README.md @@ -83,12 +83,18 @@ sudo dnf install -y maven ## Get the source codes -- github: +First of all, you may clone the source codes from github: ```bash git clone https://github.com/taosdata/TDengine.git cd TDengine ``` +The connectors for go & grafana have been moved to separated repositories, +so you should run this command in the TDengine directory to install them: +```bash +git submodule update --init --recursive +``` + ## Build TDengine ### On Linux platform From c608fba88ed0c69d19551bb64de77994f8d971e3 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 3 Oct 2020 04:30:02 +0000 Subject: [PATCH 30/38] TD-1645 --- src/rpc/src/rpcTcp.c | 7 +++++-- src/rpc/src/rpcUdp.c | 12 +++--------- src/sync/src/taosTcpPool.c | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 46555b3647..06b5d39d61 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -196,7 +196,10 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) pthread_join(pThreadObj->thread, NULL); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { + pthread_join(pThreadObj->thread, NULL); + } + if (fd != -1) taosCloseSocket(fd); } @@ -225,7 +228,6 @@ void taosCleanUpTcpServer(void *handle) { for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); - pthread_mutex_destroy(&(pThreadObj->mutex)); } tDebug("%s TCP server is cleaned up", pServerObj->label); @@ -526,6 +528,7 @@ static void *taosProcessTcpData(void *param) { taosFreeFdObj(pFdObj); } + pthread_mutex_destroy(&(pThreadObj->mutex)); tDebug("%s TCP thread exits ...", pThreadObj->label); taosTFree(pThreadObj); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4fd0318ae6..4ea47582b9 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -140,18 +140,15 @@ void taosStopUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); - pConn->fd = -1; } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - if (taosComparePthread(pConn->thread, pthread_self())) { - pthread_detach(pthread_self()); - } else { - pthread_join(pConn->thread, NULL); - } + pthread_join(pConn->thread, NULL); } + taosTFree(pConn->buffer); + // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } tDebug("%s UDP is stopped", pSet->label); @@ -233,9 +230,6 @@ static void *taosRecvUdpData(void *param) { (*(pConn->processData))(&recvInfo); } - taosTFree(pConn->buffer); - tDebug("%s UDP recv thread exits", pConn->label); - return NULL; } diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index 539cfb64da..6a210a136f 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -324,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { } pthread_join(thread, NULL); - taosClose(fd); + if (fd >= 0) taosClose(fd); } From 7367b31275d0ad6930725bf21c0707807f7ef3e3 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 6 Oct 2020 06:20:07 +0000 Subject: [PATCH 31/38] TD-1645 --- src/rpc/src/rpcTcp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 06b5d39d61..20b03b34e3 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -342,8 +342,8 @@ void taosCleanUpTcpClient(void *chandle) { SThreadObj *pThreadObj = chandle; if (pThreadObj == NULL) return; + tDebug ("%s TCP client will be cleaned up", pThreadObj->label); taosStopTcpThread(pThreadObj); - tDebug ("%s TCP client is cleaned up", pThreadObj->label); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -378,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle) { SFdObj *pFdObj = chandle; - if (pFdObj == NULL) return; + if (pFdObj == NULL || pFdObj->signature != pFdObj) return; SThreadObj *pThreadObj = pFdObj->pThreadObj; tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); @@ -391,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { SFdObj *pFdObj = chandle; - if (chandle == NULL) return -1; + if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; return taosWriteMsg(pFdObj->fd, data, len); } From 12687355c07b28c21e4ef628482a1a26858ac351 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 6 Oct 2020 11:46:33 +0000 Subject: [PATCH 32/38] TD-1632 --- src/connector/go | 2 +- src/rpc/src/rpcMain.c | 40 +++++++++++++++++++--------------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/connector/go b/src/connector/go index 567b7b12f3..8c58c512b6 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a5d3b079a..b86b95b858 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -881,6 +881,20 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; + SRpcReqContext *pContext = pConn->pContext; + + if (pHead->code == TSDB_CODE_RPC_REDIRECT) { + if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { + // if EpSet is not included in the msg, treat it as NOT_READY + pHead->code = TSDB_CODE_RPC_NOT_READY; + } else { + pContext->redirect++; + if (pContext->redirect > TSDB_MAX_REPLICA) { + pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + tWarn("%s, too many redirects, quit", pConn->info); + } + } + } return TSDB_CODE_SUCCESS; } @@ -950,12 +964,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont SRpcReqContext *pContext = pConn->pContext; *ppContext = pContext; pConn->pContext = NULL; - pConn->pReqMsg = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } } } } @@ -1083,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; - rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { + rpcMsg.ahandle = pConn->ahandle; if (rpcMsg.contLen > 0) { rpcMsg.handle = pConn; rpcAddRef(pRpc); // add the refCount for requests @@ -1103,25 +1111,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } else { // it's a response rpcMsg.handle = pContext; + rpcMsg.ahandle = pContext->ahandle; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } else { rpcCloseConn(pConn); } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - if (rpcMsg.contLen < sizeof(SRpcEpSet)) { - // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; - } else { - pContext->redirect++; - if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - tWarn("%s, too many redirects, quit", pConn->info); - } - } - } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; From 756aa48077f6dd6a441f465b1d0521f30971e4ff Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 7 Oct 2020 02:00:38 +0000 Subject: [PATCH 33/38] TD-1632 --- src/rpc/src/rpcMain.c | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index b86b95b858..030788eb7b 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; } + if (rpcContLenFromMsg(pHead->msgLen) <= 0) { + tDebug("%s, message body is empty, ignore", pConn->info); + return TSDB_CODE_RPC_APP_ERROR; + } + pConn->inTranId = pHead->tranId; pConn->inType = pHead->msgType; + // start the progress timer to monitor the response from server app + if (pConn->connType != RPC_CONN_TCPS) + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); + return 0; } @@ -960,11 +969,10 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); - if (terrno == 0) { - SRpcReqContext *pContext = pConn->pContext; - *ppContext = pContext; - pConn->pContext = NULL; - } + + SRpcReqContext *pContext = pConn->pContext; + *ppContext = pContext; + pConn->pContext = NULL; } } @@ -1094,20 +1102,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte if ( rpcIsReq(pHead->msgType) ) { rpcMsg.ahandle = pConn->ahandle; - if (rpcMsg.contLen > 0) { - rpcMsg.handle = pConn; - rpcAddRef(pRpc); // add the refCount for requests + rpcMsg.handle = pConn; + rpcAddRef(pRpc); // add the refCount for requests - // start the progress timer to monitor the response from server app - if (pConn->connType != RPC_CONN_TCPS) - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); - - // notify the server app - (*(pRpc->cfp))(&rpcMsg, NULL); - } else { - tDebug("%s, message body is empty, ignore", pConn->info); - rpcFreeCont(rpcMsg.pCont); - } + // notify the server app + (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response rpcMsg.handle = pContext; From 75767af7aa7e073f27a6cc50550dd6c14554c039 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 7 Oct 2020 02:08:40 +0000 Subject: [PATCH 34/38] TD-1632 --- src/rpc/src/rpcMain.c | 8 ++++---- src/rpc/src/rpcTcp.c | 2 +- src/rpc/src/rpcUdp.c | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 030788eb7b..fea8d104ca 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) { tError("failed to malloc msg, size:%d", size); return NULL; } else { - tDebug("malloc msg: %p", start); + tTrace("malloc mem: %p", start); } return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); @@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) { if (cont) { char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); free(temp); - tDebug("free mem: %p", temp); + tTrace("free mem: %p", temp); } } @@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); - tDebug("free msg: %p", temp); + tTrace("free mem: %p", temp); } } @@ -1450,7 +1450,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { pNewHead->msgLen = rpcMsgLenFromCont(origLen); rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; - //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); + tTrace("decomp malloc mem: %p", temp); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 20b03b34e3..dd9e7684e0 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -438,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } else { - tDebug("TCP malloc mem: %p", buffer); + tTrace("TCP malloc mem: %p", buffer); } msg = buffer + tsRpcOverhead; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4ea47582b9..6f65304661 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) { tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); continue; } else { - tDebug("UDP malloc mem: %p", tmsg); + tTrace("UDP malloc mem: %p", tmsg); } tmsg += tsRpcOverhead; // overhead for SRpcReqContext From 3047456325ee7f93398b63196993f14514a688ee Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 7 Oct 2020 09:41:11 +0000 Subject: [PATCH 35/38] TD-1632 --- src/rpc/src/rpcMain.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index fea8d104ca..414d37d8b8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -969,10 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); - - SRpcReqContext *pContext = pConn->pContext; - *ppContext = pContext; - pConn->pContext = NULL; + *ppContext = pConn->pContext; } } From b358ecf29bda74fe953dccad168c35509900bd37 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 7 Oct 2020 23:03:12 +0800 Subject: [PATCH 36/38] [TD-1651] feature: add lgtm code analyzer support. --- .lgtm.yml | 402 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 402 insertions(+) create mode 100644 .lgtm.yml diff --git a/.lgtm.yml b/.lgtm.yml new file mode 100644 index 0000000000..fbcedead43 --- /dev/null +++ b/.lgtm.yml @@ -0,0 +1,402 @@ +########################################################################################## +# Customize file classifications. # +# Results from files under any classifier will be excluded from LGTM # +# statistics. # +########################################################################################## + +########################################################################################## +# Use the `path_classifiers` block to define changes to the default classification of # +# files. # +########################################################################################## + +path_classifiers: + # docs: + # Identify the top-level file called `generate_javadoc.py` as documentation-related. + test: + # Override LGTM's default classification of test files by excluding all files. + - exclude: / + # Classify all files in the top-level directories tests/ and testsuites/ as test code. + - tests + # - testsuites + # Classify all files with suffix `.test` as test code. + # Note: use only forward slash / as a path separator. + # Use ** to indicate an arbitrary parent path. + # Use * to indicate any sequence of characters excluding /. + # Always enclose the expression in double quotes if it includes *. + # - "**/*.test" + # Refine the classifications above by excluding files in test/util/. + # - exclude: test/util + # The default behavior is to tag all files created during the + # build as `generated`. Results are hidden for generated code. You can tag + # further files as being generated by adding them to the `generated` section. + generated: + # Exclude all `*.c` files under the `ui/` directory from classification as + # generated code. + # - exclude: ui/**/*.c + # By default, all files not checked into the repository are considered to be + # 'generated'. + # The default behavior is to tag library code as `library`. Results are hidden + # for library code. You can tag further files as being library code by adding them + # to the `library` section. + library: + - exclude: deps/ + # The default behavior is to tag template files as `template`. Results are hidden + # for template files. You can tag further files as being template files by adding + # them to the `template` section. + template: + #- exclude: path/to/template/code/**/*.c + # Define your own category, for example: 'some_custom_category'. + some_custom_category: + # Classify all files in the top-level directory tools/ (or the top-level file + # called tools). + # - tools + +######################################################################################### +# Use the `queries` block to change the default display of query results. # +######################################################################################### + + # queries: + # Start by hiding the results of all queries. + # - exclude: "*" + # Then include all queries tagged 'security' and 'correctness', and with a severity of + # 'error'. + # - include: + # tags: + # - "security" + # - "correctness" + # severity: "error" + # Specifically hide the results of two queries. + # - exclude: cpp/use-of-goto + # - exclude: java/equals-on-unrelated-types + # Refine by including the `java/command-line-injection` query. + # - include: java/command-line-injection + +######################################################################################### +# Define changes to the default code extraction process. # +# Each block configures the extraction of a single language, and modifies actions in a # +# named step. Every named step includes automatic default actions, # +# except for the 'prepare' step. The steps are performed in the following sequence: # +# prepare # +# after_prepare # +# configure (C/C++ only) # +# python_setup (Python only) # +# before_index # +# index # +########################################################################################## + +######################################################################################### +# Environment variables available to the steps: # +######################################################################################### + +# LGTM_SRC +# The root of the source tree. +# LGTM_WORKSPACE +# An existing (initially empty) folder outside the source tree. +# Used for temporary download and setup commands. + +######################################################################################### +# Use the extraction block to define changes to the default code extraction process # +# for one or more languages. The settings for each language are defined in a child # +# block, with one or more steps. # +######################################################################################### + +extraction: + # Define settings for C/C++ analysis + ##################################### + cpp: + # The `prepare` step exists for customization on LGTM.com only. + prepare: + # # The `packages` section is valid for LGTM.com only. It names Ubuntu packages to + # # be installed. + packages: + - cmake + # Add an `after-prepare` step if you need to run commands after the prepare step. + # Each command should be listed on a separate line. + # This step is useful for C/C++ analysis where you want to prepare the environment + # for the `configure` step without changing the default behavior for that step. + # after_prepare: + #- export GNU_MAKE=make + #- export GIT=true + # The `configure` step generates build configuration files which the `index` step + # then uses to build the codebase. + configure: + command: + - mkdir build + - cd build + - cmake .. + # - ./prepare_deps + # Optional step. You should add a `before_index` step if you need to run commands + # before the `index` step. + # before_index: + # - export BOOST_DIR=$LGTM_SRC/boost + # - export GTEST_DIR=$LGTM_SRC/googletest + # - export HUNSPELL_DIR=$LGTM_SRC/hunspell + # - export CRYPTOPP_DIR=$LGTM_SRC/cryptopp + # The `index` step builds the code and extracts information during the build + # process. + index: + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + build_command: + - cd build + - make + # - $GNU_MAKE -j2 -s + # Specify that all project or solution files should be used for extraction. + # Default: false. + # all_solutions: true + # Specify a list of one or more project or solution files for extraction. + # Default: LGTM chooses the file closest to the root of the repository (this may + # fail if there are multiple candidates). + # solution: + # - myProject.sln + # Specify MSBuild settings + # msbuild: + # Specify a list of additional arguments to MSBuild. Default: empty. + # arguments: /p:Platform=x64 /p:Configuration=Release + # Specify the MSBuild configuration to use, for example, debug or release. + # Default: read from the solution file or files. + # configuration: + # Specify the platform to target, for example: x86, x64, or Any CPU. + # Default: read from the solution file or files. + # platform: + # Specify the MSBuild target. Default: rebuild. + # target: + # Specify whether or not to perform a NuGet restore for extraction. Default: true. + # nuget_restore: false + # Specify a version of Microsoft Visual Studio to use for MSBuild or any custom + # build commands (build_command). For example: + # 10 for Visual Studio 2010 + # 12 for Visual Studio 2012 + # 14 for Visual Studio 2015 + # 15 for Visual Studio 2017 + # Default: read from project files. + # vstools_version: 10 + + # Define settings for C# analysis + ################################## + # csharp: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the `prepare` step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step builds the code and extracts information during the build + # process. + #index: + # Specify that all project or solution files should be used for extraction. + # Default: false. + # all_solutions: true + # Specify a list of one or more project or solution files for extraction. + # Default: LGTM chooses the file closest to the root of the repository (this may + # fail if there are multiple candidates). + # solution: + # - myProject.sln + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + # build_command: + # - ./example-compile-all.sh + # By default, LGTM analyzes the code by building it. You can override this, + # and tell LGTM not to build the code. Beware that this can lead + # to less accurate results. + # buildless: true + # Specify .NET Core settings. + # dotnet: + # Specify additional arguments to `dotnet build`. + # Default: empty. + # arguments: "example_arg" + # Specify the version of .NET Core SDK to use. + # Default: The version installed on the build machine. + # version: 2.1 + # Specify MSBuild settings. + # msbuild: + # Specify a list of additional arguments to MSBuild. Default: empty. + # arguments: /P:WarningLevel=2 + # Specify the MSBuild configuration to use, for example, debug or release. + # Default: read from the solution file or files. + # configuration: release + # Specify the platform to target, for example: x86, x64, or Any CPU. + # Default: read from the solution file or files. + # platform: x86 + # Specify the MSBuild target. Default: rebuild. + # target: notest + # Specify whether or not to perform a NuGet restore for extraction. Default: true. + # nuget_restore: false + # Specify a version of Microsoft Visual Studio to use for MSBuild or any custom + # build commands (build_command). For example: + # 10 for Visual Studio 2010 + # 12 for Visual Studio 2012 + # 14 for Visual Studio 2015 + # 15 for Visual Studio 2017 + # Default: read from project files + # vstools_version: 10 + # Specify additional options for the extractor, + # for example --fast to perform a faster extraction that produces a smaller + # database. + # extractor: "--fast" + + # Define settings for Go analysis + ################################## + # go: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the `prepare` step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step builds the code and extracts information during the build + # process. + # index: + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + # build_command: + # - ./compile-all.sh + + # Define settings for Java analysis + #################################### + # java: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the prepare step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step extracts information from the files in the codebase. + # index: + # Specify Gradle settings. + # gradle: + # Specify the required Gradle version. + # Default: determined automatically. + # version: 4.4 + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + # build_command: ./compile-all.sh + # Specify the Java version required to build the project. + # java_version: 11 + # Specify whether to extract Java .properties files + # Default: false + # properties_files: true + # Specify Maven settings. + # maven: + # Specify the path (absolute or relative) of a Maven settings file to use. + # Default: Maven uses a settings file in the default location, if it exists. + # settings_file: /opt/share/settings.xml + # Specify the path of a Maven toolchains file. + # Default: Maven uses a toolchains file in the default location, if it exists. + # toolchains_file: /opt/share/toolchains.xml + # Specify the required Maven version. + # Default: the Maven version is determined automatically, where feasible. + # version: 3.5.2 + # Specify how XML files should be extracted: + # all = extract all XML files. + # default = only extract XML files named `AndroidManifest.xml`, `pom.xml`, and `web.xml`. + # disabled = do not extract any XML files. + # xml_mode: all + + # Define settings for JavaScript analysis + ########################################## + # javascript: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the prepare step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step extracts information from the files in the codebase. + # index: + # Specify a list of files and folders to extract. + # Default: The project root directory. + # include: + # - src/js + # Specify a list of files and folders to exclude from extraction. + # exclude: + # - thirdparty/lib + # You can add additional file types for LGTM to extract, by mapping file + # extensions (including the leading dot) to file types. The usual + # include/exclude patterns apply, so, for example, `.jsm` files under + # `thirdparty/lib` will not be extracted. + # filetypes: + # ".jsm": "js" + # ".tmpl": "html" + # Specify a list of glob patterns to include/exclude files from extraction; this + # is applied on top of the include/exclude paths from above; patterns are + # processed in the same way as for path classifiers above. + # Default: include all files with known extensions (such as .js, .ts and .html), + # but exclude files ending in `-min.js` or `.min.js` and folders named `node_modules` + # or `bower_components` + # filters: + # exclude any *.ts files anywhere. + # - exclude: "**/*.ts" + # but include *.ts files under src/js/typescript. + # - include: "src/js/typescript/**/*.ts" + # Specify how TypeScript files should be extracted: + # none = exclude all TypeScript files. + # basic = extract syntactic information from TypeScript files. + # full = extract syntactic and type information from TypeScript files. + # Default: full. + # typescript: basic + # By default, LGTM doesn't extract any XML files. You can override this by + # using the `xml_mode` property and setting it to `all`. + # xml_mode: all + + # Define settings for Python analysis + ###################################### + # python: + # # The `prepare` step exists for customization on LGTM.com only. + # # prepare: + # # # The `packages` section is valid for LGTM.com only. It names packages to + # # # be installed. + # # packages: libpng-dev + # # This step is useful for Python analysis where you want to prepare the + # # environment for the `python_setup` step without changing the default behavior + # # for that step. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # # This sets up the Python interpreter and virtual environment, ready for the + # # `index` step to extract the codebase. + # python_setup: + # # Specify packages that should NOT be installed despite being mentioned in the + # # requirements.txt file. + # # Default: no package marked for exclusion. + # exclude_requirements: + # - pywin32 + # # Specify a list of pip packages to install. + # # If any of these packages cannot be installed, the extraction will fail. + # requirements: + # - Pillow + # # Specify a list of requirements text files to use to set up the environment, + # # or false for none. Default: any requirements.txt, test-requirements.txt, + # # and similarly named files identified in the codebase are used. + # requirements_files: + # - required-packages.txt + # # Specify a setup.py file to use to set up the environment, or false for none. + # # Default: any setup.py files identified in the codebase are used in preference + # # to any requirements text files. + # setup_py: new-setup.py + # # Override the version of the Python interpreter used for setup and extraction + # # Default: Python 3. + # version: 2 + # # Optional step. You should add a `before_index` step if you need to run commands + # # before the `index` step. + # before_index: + # - antlr4 -Dlanguage=Python3 Grammar.g4 + # # The `index` step extracts information from the files in the codebase. + # index: + # # Specify a list of files and folders to exclude from extraction. + # # Default: Git submodules and Subversion externals. + # exclude: + # - legacy-implementation + # - thirdparty/libs + # filters: + # - exclude: "**/documentation/examples/snippets/*.py" + # - include: "**/documentation/examples/test_application/*" + # include: + # - example/to/include From 8e2bb2c1e4ff1d8a94faed6d38071764de6efcbc Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Thu, 8 Oct 2020 11:05:02 +0800 Subject: [PATCH 37/38] fix td-1249 --- src/dnode/src/dnodeVRead.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index d66ebf9772..fb4ffcdafa 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -187,6 +187,7 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { } void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { + rpcFreeCont(pRead->rpcMsg.pCont); vnodeRelease(pVnode); return; } From 42e7be57a4cc03ad71a3351f410d5b87cb1e6041 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 8 Oct 2020 14:52:02 +0800 Subject: [PATCH 38/38] TD-1652 --- src/dnode/src/dnodeMgmt.c | 20 ++++++++++++++++++-- src/dnode/src/dnodeModule.c | 23 ++++++++++------------- src/inc/dnode.h | 2 +- src/mnode/src/mnodeSdb.c | 7 +------ tests/script/unique/mnode/mgmt23.sim | 2 +- 5 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 8f87a45fb7..8f2b687dc4 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -468,8 +468,24 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { if (!mnodeIsRunning()) { if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) { - dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]); - sdbUpdateSync(); + dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]); + bool find = false; + for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) { + if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) { + dInfo("localEp found in mnode infos"); + find = true; + break; + } + } + + if (!find) { + dInfo("localEp not found in mnode infos, will set into mnode infos"); + tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN); + tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId(); + tsDMnodeInfos.nodeNum++; + } + + dnodeStartMnode(); } } } diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 18a293d415..ba7cdf2664 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -146,19 +146,16 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { } } -bool dnodeCheckMnodeStarting() { - if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) return false; - - SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); - for (int32_t i = 0; i < mnodes->nodeNum; ++i) { - SDMMnodeInfo *node = &mnodes->nodeInfos[i]; - if (node->nodeId == dnodeGetDnodeId()) { - uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE); - dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); - dnodeProcessModuleStatus(moduleStatus); - return true; - } +bool dnodeStartMnode() { + if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { + dDebug("mnode module is already started, module status:%d", tsModuleStatus); + return false; } - return false; + uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE); + dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); + dnodeProcessModuleStatus(moduleStatus); + + sdbUpdateSync(); + return true; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index fda9c1c1dd..017241c4f8 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -43,7 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet); void dnodeGetMnodeEpSetForShell(void *epSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); -bool dnodeCheckMnodeStarting(); +bool dnodeStartMnode(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 4c672eb557..7654536122 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -91,7 +91,6 @@ typedef struct { } SSdbWriteWorkerPool; extern void * tsMnodeTmr; -static void * tsUpdateSyncTmr; static SSdbObject tsSdbObj = {0}; static taos_qset tsSdbWriteQset; static taos_qall tsSdbWriteQall; @@ -298,16 +297,12 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { taosFreeQitem(pOper); } -static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); } - void sdbUpdateSync() { if (!mnodeIsRunning()) { mDebug("mnode not start yet, update sync info later"); - if (dnodeCheckMnodeStarting()) { - taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr); - } return; } + mDebug("update sync info in sdb"); SSyncCfg syncCfg = {0}; diff --git a/tests/script/unique/mnode/mgmt23.sim b/tests/script/unique/mnode/mgmt23.sim index 4851872860..d1820ef8c6 100644 --- a/tests/script/unique/mnode/mgmt23.sim +++ b/tests/script/unique/mnode/mgmt23.sim @@ -65,7 +65,7 @@ endi print ============== step4 sql drop dnode $hostname2 -sleep 16000 +sleep 10000 sql show mnodes $dnode1Role = $data2_1