From 0eb138cd1d7cd191f1ab4ad89b0a96963d2bc3b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 15:04:05 +0800 Subject: [PATCH 01/39] [td-1583] --- src/client/src/tscSQLParser.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b9d4cc13d1..0d75e60c28 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6205,6 +6205,10 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar; + if (pTableItem1->nType != TSDB_DATA_TYPE_BINARY) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11); + } + SStrToken aliasName = {.z = pTableItem1->pz, .n = pTableItem1->nLen, .type = TK_STRING}; if (tscValidateName(&aliasName) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11); From 535816bc5aeea021ec11f66970733ff86c0ccf4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 17:45:57 +0800 Subject: [PATCH 02/39] [td-1596] --- src/os/src/detail/osTime.c | 2 +- src/query/src/qExecutor.c | 32 ++++++++++++----- tests/script/general/parser/topbot.sim | 49 ++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index bd4dc24554..b78627f46f 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -481,7 +481,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision)); } else { int64_t delta = t - pInterval->interval; - int32_t factor = delta > 0 ? 1 : -1; + int32_t factor = (delta >= 0) ? 1 : -1; start = (delta / pInterval->sliding + factor) * pInterval->sliding; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f2d324e376..84a5e25ab7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2225,10 +2225,11 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) { return false; } -int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { +int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { SQuery *pQuery = pRuntimeEnv->pQuery; - *status = 0; + *status = BLK_DATA_NO_NEEDED; + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) { *status = BLK_DATA_ALL_NEEDED; } else { // check if this data block is required to load @@ -2240,12 +2241,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, } if ((*status) != BLK_DATA_ALL_NEEDED) { + // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, + // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + bool hasTimeWindow = false; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + + TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; + + STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) != + TSDB_CODE_SUCCESS) { + // todo handle error in set result for timewindow + } + } + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; int32_t functionId = pSqlFunc->functionId; int32_t colId = pSqlFunc->colInfo.colId; - (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; @@ -2476,7 +2491,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SArray * pDataBlock = NULL; uint32_t status = 0; - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); if (ret != TSDB_CODE_SUCCESS) { break; } @@ -4667,18 +4682,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } - SDataStatis *pStatis = NULL; - SArray * pDataBlock = NULL; uint32_t status = 0; + SDataStatis *pStatis = NULL; + SArray *pDataBlock = NULL; - int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status); if (ret != TSDB_CODE_SUCCESS) { break; } if (status == BLK_DATA_DISCARD) { - pQuery->current->lastKey = - QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step; + pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step; continue; } diff --git a/tests/script/general/parser/topbot.sim b/tests/script/general/parser/topbot.sim index 5c575b6163..c2b41888d7 100644 --- a/tests/script/general/parser/topbot.sim +++ b/tests/script/general/parser/topbot.sim @@ -213,4 +213,53 @@ if $data01 != 5195.000000000 then return -1 endi +print =======================>td-1596 +sql create table t2(ts timestamp, k int) +sql insert into t2 values('2020-1-2 1:1:1', 1); +sql insert into t2 values('2020-2-2 1:1:1', 1); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start +sql connect +sleep 1000 + +sql use db +sql select count(*), first(ts), last(ts) from t2 interval(1d); +if $rows != 2 then + return -1 +endi + +if $data00 != @20-01-02 00:00:00.000@ then + print expect 20-01-02 00:00:00.000, actual: $data00 + return -1 +endi + +if $data10 != @20-02-02 00:00:00.000@ then + return -1 +endi + +if $data01 != 1 then + return -1 +endi + +if $data11 != 1 then + return -1 +endi + +if $data02 != @20-01-02 01:01:01.000@ then + return -1 +endi + +if $data12 != @20-02-02 01:01:01.000@ then + return -1 +endi + +if $data03 != @20-01-02 01:01:01.000@ then + return -1 +endi + +if $data13 != @20-02-02 01:01:01.000@ then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From 2b909bc151775cc56cfe683c0da160433e608b20 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 18:54:12 +0800 Subject: [PATCH 03/39] [td-1598] --- src/client/src/tscSQLParser.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 0d75e60c28..bb20f09c50 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -827,12 +827,12 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa SStrToken t = {0}; getCurrentDBName(pSql, &t); if (t.n == 0) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - - code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); - if (code != 0) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } else { + code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); + if (code != 0) { + invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } } } From 901803abe64cf9b222733c9b9a400333d8804e80 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 14:11:38 +0800 Subject: [PATCH 04/39] [td-1604] --- src/client/src/tscAsync.c | 2 +- src/client/src/tscParseInsert.c | 44 ++++++++++++++++----------------- src/client/src/tscPrepare.c | 6 ----- src/inc/taoserror.h | 2 +- 4 files changed, 23 insertions(+), 31 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index c5d622e245..400613595b 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -509,7 +509,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) { + if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6fd97c09e9..16ff0139b4 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -406,7 +406,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start return TSDB_CODE_SUCCESS; } -int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error, +int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, SSqlCmd* pCmd, int16_t timePrec, int32_t *code, char *tmpTokenBuf) { int32_t index = 0; SStrToken sToken = {0}; @@ -426,12 +426,17 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ *str += index; if (sToken.type == TK_QUESTION) { + if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { + *code = tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str); + return -1; + } + uint32_t offset = (uint32_t)(start - pDataBlocks->pData); if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { continue; } - strcpy(error, "client out of memory"); + strcpy(pCmd->payload, "client out of memory"); *code = TSDB_CODE_TSC_OUT_OF_MEMORY; return -1; } @@ -439,8 +444,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ int16_t type = sToken.type; if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) { - tscSQLSyntaxErrMsg(error, "invalid data or symbol", sToken.z); - *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; + *code = tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z); return -1; } @@ -470,14 +474,14 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[ } bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); - int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec); + int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec); if (ret != TSDB_CODE_SUCCESS) { *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return -1; // NOTE: here 0 mean error! } if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { - tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z); + tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z); *code = TSDB_CODE_TSC_INVALID_TIME_STAMP; return -1; } @@ -522,7 +526,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { } int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows, - SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) { + SParsedDataColInfo *spd, SSqlCmd* pCmd, int32_t *code, char *tmpTokenBuf) { int32_t index = 0; SStrToken sToken; @@ -534,8 +538,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe int32_t precision = tinfo.precision; if (spd->hasVal[0] == false) { - strcpy(error, "primary timestamp column can not be null"); - *code = TSDB_CODE_TSC_INVALID_SQL; + *code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", *str); return -1; } @@ -547,17 +550,17 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe *str += index; if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) { int32_t tSize; - int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize); - if (retcode != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client - strcpy(error, "client out of memory"); - *code = retcode; + *code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize); + if (*code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client + strcpy(pCmd->payload, "client out of memory"); return -1; } + ASSERT(tSize > maxRows); maxRows = tSize; } - int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf); + int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, pCmd, precision, code, tmpTokenBuf); if (len <= 0) { // error message has been set in tsParseOneRowData return -1; } @@ -568,7 +571,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe sToken = tStrGetToken(*str, &index, false, 0, NULL); *str += index; if (sToken.n == 0 || sToken.type != TK_RP) { - tscSQLSyntaxErrMsg(error, ") expected", *str); + tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str); *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return -1; } @@ -577,7 +580,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe } if (numOfRows <= 0) { - strcpy(error, "no any data points"); + strcpy(pCmd->payload, "no any data points"); *code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return -1; } else { @@ -704,7 +707,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf); + int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd, &code, tmpTokenBuf); free(tmpTokenBuf); if (numOfRows <= 0) { return code; @@ -724,10 +727,6 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st dataBuf->vgId = pTableMeta->vgroupInfo.vgId; dataBuf->numOfTables = 1; - /* - * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS, - * which is actually returned from server. - */ *totalNum += numOfRows; return TSDB_CODE_SUCCESS; } @@ -1459,8 +1458,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { char *lineptr = line; strtolower(line, line); - int32_t len = - tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf); + int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd, tinfo.precision, &code, tokenBuf); if (len <= 0 || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; break; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index cdbd8685df..b3bb4566be 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -43,10 +43,6 @@ typedef struct SNormalStmt { tVariant* params; } SNormalStmt; -//typedef struct SInsertStmt { -// -//} SInsertStmt; - typedef struct STscStmt { bool isInsert; STscObj* taos; @@ -54,7 +50,6 @@ typedef struct STscStmt { SNormalStmt normal; } STscStmt; - static int normalStmtAddPart(SNormalStmt* stmt, bool isParam, char* str, uint32_t len) { uint16_t size = stmt->numParts + 1; if (size > stmt->sizeParts) { @@ -514,7 +509,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { SSqlObj* pSql = pStmt->pSql; size_t sqlLen = strlen(sql); - //doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; pSql->param = (void*) pSql; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 67e2d43c98..6987647fe4 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -98,7 +98,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax errr in SQL") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") From 8195149c72e084c3c5353893e0fd08374c26ed2c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 14:12:39 +0800 Subject: [PATCH 05/39] [td-225]. --- src/client/src/tscServer.c | 4 ++-- src/client/src/tscSql.c | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3cbb0d936e..ac4bc5b066 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -259,8 +259,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } - pSql->pRpcCtx = NULL; // clear the rpcCtx - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", @@ -389,6 +387,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } rpcFreeCont(rpcMsg->pCont); + } int doProcessSql(SSqlObj *pSql) { @@ -475,6 +474,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; if (pSub->pRpcCtx != NULL) { rpcCancelRequest(pSub->pRpcCtx); + pSub->pRpcCtx = NULL; } tscQueueAsyncRes(pSub); // async res? not other functions? diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 430a762321..acc93530e3 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -698,6 +698,7 @@ void taos_stop_query(TAOS_RES *res) { tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { + assert(pSql->pRpcCtx != NULL); rpcCancelRequest(pSql->pRpcCtx); } } From df32088b5dc8353d36c12ad855988c92c3d00f31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 14:30:08 +0800 Subject: [PATCH 06/39] [td-1604]. --- tests/script/general/parser/where.sim | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index f9fd919bd6..bfbeddf93a 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -234,6 +234,13 @@ if $data11 != @19-01-01 09:10:00.000@ then endi sql create table tb_where_NULL (ts timestamp, c1 float, c2 binary(10)) + +print ===================>td-1604 +sql_error insert into tb_where_NULL values(?, ?, ?) +sql_error insert into tb_where_NULL values(now, 1, ?) +sql_error insert into tb_where_NULL values(?, 1, '') +sql_error insert into tb_where_NULL values(now, ?, '12') + sql insert into tb_where_NULL values ('2019-01-01 09:00:00.000', 1, 'val1') sql insert into tb_where_NULL values ('2019-01-01 09:00:01.000', NULL, NULL) sql insert into tb_where_NULL values ('2019-01-01 09:00:02.000', 2, 'val2') @@ -330,4 +337,5 @@ if $rows != 0 then return -1 endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From 0dcef5b340d48e7aa977c1914edd5bff52b5d3fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:04:47 +0800 Subject: [PATCH 07/39] [td-1603] --- src/client/src/tscSQLParser.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index bb20f09c50..2c26275262 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -817,17 +817,16 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa oldName = strdup(pTableMetaInfo->name); } - if (hasSpecifyDB(pzTableName)) { - // db has been specified in sql string so we ignore current db path + if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); if (code != 0) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - } else { // get current DB name first, then set it into path + } else { // get current DB name first, and then set it into path SStrToken t = {0}; getCurrentDBName(pSql, &t); - if (t.n == 0) { - code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + if (t.n == 0) { // current database not available or not specified + code = TSDB_CODE_MND_DB_NOT_SELECTED; } else { code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); if (code != 0) { From b2dd094ffaf01f9b8a6596db4e12f5ae65adb1d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:13:33 +0800 Subject: [PATCH 08/39] [td-1603] --- src/client/src/tscSQLParser.c | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2c26275262..c51ea148f3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -805,17 +805,13 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) { const char* msg1 = "name too long"; - const char* msg2 = "current database or database name invalid"; SSqlCmd* pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; // backup the old name in pTableMetaInfo - size_t size = strlen(pTableMetaInfo->name); - char* oldName = NULL; - if (size > 0) { - oldName = strdup(pTableMetaInfo->name); - } + char oldName[TSDB_TABLE_FNAME_LEN] = {0}; + tstrncpy(oldName, pTableMetaInfo->name, tListLen(oldName)); if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); @@ -836,23 +832,17 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa } if (code != TSDB_CODE_SUCCESS) { - taosTFree(oldName); return code; } /* - * the old name exists and is not equalled to the new name. Release the metermeta/metricmeta + * the old name exists and is not equalled to the new name. Release the table meta * that are corresponding to the old name for the new table name. */ - if (size > 0) { - if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { - tscClearTableMetaInfo(pTableMetaInfo, false); - } - } else { - assert(pTableMetaInfo->pTableMeta == NULL); + if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { + tscClearTableMetaInfo(pTableMetaInfo, false); } - taosTFree(oldName); return TSDB_CODE_SUCCESS; } From 9cb7efeec1129ba90e090080f1da459c31f1062f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:41:14 +0800 Subject: [PATCH 09/39] [td-1603] --- src/client/src/tscSQLParser.c | 66 +++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c51ea148f3..b0a68d1bb7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -233,8 +233,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } else if (pInfo->type == TSDB_SQL_DROP_TABLE) { assert(pInfo->pDCLInfo->nTokens == 1); - code = tscSetTableFullName(pTableMetaInfo, pzName, pSql); - if(code != TSDB_CODE_SUCCESS) { + if((code = tscSetTableFullName(pTableMetaInfo, pzName, pSql)) != TSDB_CODE_SUCCESS) { return code; } } else if (pInfo->type == TSDB_SQL_DROP_DNODE) { @@ -362,8 +361,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } // additional msg has been attached already - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } return tscGetTableMeta(pSql, pTableMetaInfo); @@ -381,14 +381,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + code = tscSetTableFullName(pTableMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } + return tscGetTableMeta(pSql, pTableMetaInfo); } case TSDB_SQL_SHOW_CREATE_DATABASE: { const char* msg1 = "invalid database name"; - const char* msg2 = "table name is too long"; SStrToken* pToken = &pInfo->pDCLInfo->a[0]; if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) { @@ -397,11 +398,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pToken->n > TSDB_DB_NAME_LEN) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - return TSDB_CODE_SUCCESS; - } + + return tscSetTableFullName(pTableMetaInfo, pToken, pSql); + } case TSDB_SQL_CFG_DNODE: { const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 "; const char* msg3 = "invalid dnode ep"; @@ -4555,6 +4554,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { const char* msg18 = "primary timestamp column cannot be dropped"; const char* msg19 = "invalid new tag name"; + int32_t code = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); @@ -4565,13 +4566,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo); - if (ret != TSDB_CODE_SUCCESS) { - return ret; + code = tscGetTableMeta(pSql, pTableMetaInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -5869,8 +5871,9 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + int32_t code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + if(code != TSDB_CODE_SUCCESS) { + return code; } if (!validateTableColumnInfo(pFieldList, pCmd) || @@ -5924,15 +5927,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + int32_t code = tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } // get meter meta from mnode tstrncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, sizeof(pCreateTable->usingInfo.tagdata.name)); tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals; - int32_t code = tscGetTableMeta(pSql, pStableMeterMetaInfo); + code = tscGetTableMeta(pSql, pStableMeterMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6009,7 +6013,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) { int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { const char* msg1 = "invalid table name"; - const char* msg2 = "table name too long"; const char* msg3 = "fill only available for interval query"; const char* msg4 = "fill option not supported in stream computing"; const char* msg5 = "sql too long"; // todo ADD support @@ -6041,11 +6044,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSetTableFullName(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + int32_t code = tscSetTableFullName(pTableMetaInfo, &srcToken, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } - int32_t code = tscGetTableMeta(pSql, pTableMetaInfo); + code = tscGetTableMeta(pSql, pTableMetaInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -6072,8 +6076,9 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { } // set the created table[stream] name - if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } if (pQuerySql->selectToken.n > TSDB_MAX_SAVED_SQL_LEN) { @@ -6189,8 +6194,9 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2); SStrToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz}; - if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; + code = tscSetTableFullName(pTableMetaInfo1, &t, pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; } tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar; From 284e159361b809521898789e28001cc4bcf53881 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Sep 2020 15:53:43 +0800 Subject: [PATCH 10/39] [td-1603] --- src/client/src/tscSQLParser.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b0a68d1bb7..30a25a0285 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -838,7 +838,7 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa * the old name exists and is not equalled to the new name. Release the table meta * that are corresponding to the old name for the new table name. */ - if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { + if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { tscClearTableMetaInfo(pTableMetaInfo, false); } @@ -6122,7 +6122,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0)); const char* msg0 = "invalid table name"; - //const char* msg1 = "table name too long"; const char* msg2 = "point interpolation query needs timestamp"; const char* msg5 = "fill only available for interval query"; const char* msg6 = "start(end) time of query range required or time range too large"; From 100121251f28267d3d72cc02a77329fe6037dd4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Sep 2020 15:11:46 +0800 Subject: [PATCH 11/39] [td-1619] --- src/tsdb/src/tsdbRead.c | 42 +++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a3bc0de272..d3f4747a96 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -697,22 +697,41 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); if (pCheckInfo->pDataCols == NULL) { - tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo); + tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return terrno; + goto _error; } } STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); - tdInitDataCols(pCheckInfo->pDataCols, pSchema); - tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); - tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); + int32_t code = tdInitDataCols(pCheckInfo->pDataCols, pSchema); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _error; + } + + code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _error; + } + + code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _error; + } int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle))); if (ret != TSDB_CODE_SUCCESS) { - return terrno; + int32_t c = terrno; + assert(c != TSDB_CODE_SUCCESS); + goto _error; } SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; @@ -729,10 +748,16 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p int64_t elapsedTime = (taosGetTimestampUs() - st); pQueryHandle->cost.blockLoadTime += elapsedTime; - tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p", + tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %p", pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo); - return TSDB_CODE_SUCCESS; + +_error: + pBlock->numOfRows = 0; + + tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %p", + pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pQueryHandle->qinfo); + return terrno; } static int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo); @@ -1241,6 +1266,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* cur->pos >= 0 && cur->pos < pBlock->numOfRows); TSKEY* tsArray = pCols->cols[0].pData; + assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); // for search the endPos, so the order needs to reverse int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; From c63ec824f3ef5d05ea187ddcd5ed05b994e6203f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Sep 2020 15:44:02 +0800 Subject: [PATCH 12/39] [td-1603] add new error code at client side --- src/client/src/tscLocal.c | 8 ++++---- src/client/src/tscSQLParser.c | 2 +- src/inc/taoserror.h | 2 ++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 030b033653..590c2829d0 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -430,7 +430,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { SSqlObj* pSql = builder->pInterSql; if (row == NULL) { - return TSDB_CODE_MND_INVALID_TABLE_NAME; + return TSDB_CODE_TSC_INVALID_TABLE_NAME; } int32_t* lengths = taos_fetch_lengths(pSql); @@ -458,7 +458,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) { } if (0 == strlen(result)) { - return TSDB_CODE_MND_INVALID_TABLE_NAME; + return TSDB_CODE_TSC_INVALID_TABLE_NAME; } return TSDB_CODE_SUCCESS; } @@ -554,7 +554,7 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) { static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { TAOS_ROW row = tscFetchRow(builder); if (row == NULL) { - return TSDB_CODE_MND_DB_NOT_SELECTED; + return TSDB_CODE_TSC_DB_NOT_SELECTED; } const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL}; @@ -586,7 +586,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) { } while (row != NULL); if (0 == strlen(result)) { - return TSDB_CODE_MND_DB_NOT_SELECTED; + return TSDB_CODE_TSC_DB_NOT_SELECTED; } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 30a25a0285..66e239074d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -821,7 +821,7 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa SStrToken t = {0}; getCurrentDBName(pSql, &t); if (t.n == 0) { // current database not available or not specified - code = TSDB_CODE_MND_DB_NOT_SELECTED; + code = TSDB_CODE_TSC_DB_NOT_SELECTED; } else { code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); if (code != 0) { diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 6987647fe4..781b5897a1 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -99,6 +99,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnect TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, 0, 0x0217, "Database not specified or available") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, 0, 0x0218, "Table does not exist") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") From 26e708bc0b2dca0a3819b4a78124da9f52cd5322 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Sep 2020 10:17:53 +0800 Subject: [PATCH 13/39] [td-1300] --- src/client/src/tscServer.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ac4bc5b066..74acc56bba 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -53,7 +53,10 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); SRpcEpSet* pEpSet = &pSql->epSet; - pEpSet->inUse = 0; + + // Issue the query to one of the vnode among a vgroup randomly. + // change the inUse property would not affect the isUse attribute of STableMeta + pEpSet->inUse = rand() % pVgroupInfo->numOfEps; // apply the FQDN string length check here bool hasFqdn = false; From db66a5da57d6f013e6fb5b60182129bf255cea5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Sep 2020 22:57:06 +0800 Subject: [PATCH 14/39] [td-225] --- src/client/inc/tsclient.h | 1 - src/os/inc/osTime.h | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ea42b0f6a3..ea49cabc4e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -229,7 +229,6 @@ typedef struct SQueryInfo { // TODO refactor STimeWindow window; // query time window SInterval interval; - int32_t tz; // query client timezone SSqlGroupbyExpr groupbyExpr; // group by tags info SArray * colList; // SArray diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index 99f7586f72..6b209219c6 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -63,9 +63,10 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) { typedef struct SInterval { - char intervalUnit; - char slidingUnit; - char offsetUnit; + int32_t tz; // query client timezone + char intervalUnit; + char slidingUnit; + char offsetUnit; int64_t interval; int64_t sliding; int64_t offset; From 2d5f36851a858674c46e529ef251673407a235b7 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Wed, 30 Sep 2020 01:21:22 +0000 Subject: [PATCH 15/39] TD-1632 --- src/rpc/src/rpcMain.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f0b8c996c5..6a5d3b079a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,13 +885,14 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); + *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -945,6 +946,17 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); + if (terrno == 0) { + SRpcReqContext *pContext = pConn->pContext; + *ppContext = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; + + // for UDP, port may be changed by server, the port in epSet shall be used for cache + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } + } } } @@ -1009,7 +1021,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - pConn = rpcProcessMsgHead(pRpc, pRecv); + SRpcReqContext *pContext; + pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1029,7 +1042,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead); + rpcProcessIncomingMsg(pConn, pHead, pContext); } } @@ -1060,7 +1073,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1089,15 +1102,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } } else { // it's a response - SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } else { + if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { rpcCloseConn(pConn); } From d8991f19137d7f78f1d5b25ccbceae32caafc96f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 30 Sep 2020 11:02:52 +0800 Subject: [PATCH 16/39] fix TD-1640 --- src/util/inc/tscompression.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/inc/tscompression.h b/src/util/inc/tscompression.h index 37d1e7b590..cca6d6e250 100644 --- a/src/util/inc/tscompression.h +++ b/src/util/inc/tscompression.h @@ -26,7 +26,7 @@ extern "C" { #define COMP_OVERFLOW_BYTES 2 #define BITS_PER_BYTE 8 // Masks -#define INT64MASK(_x) ((1ul << _x) - 1) +#define INT64MASK(_x) ((((uint64_t)1) << _x) - 1) #define INT32MASK(_x) (((uint32_t)1 << _x) - 1) #define INT8MASK(_x) (((uint8_t)1 << _x) - 1) // Compression algorithm From c9152cb35f25b5a282164acb4bc76cd9c0a1d7d0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Sep 2020 04:08:11 +0000 Subject: [PATCH 17/39] minor changes --- src/inc/taosdef.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 115630d1a5..2896e91d93 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -73,7 +73,7 @@ extern const int32_t TYPE_BYTES[11]; #define TSDB_DATA_BOOL_NULL 0x02 #define TSDB_DATA_TINYINT_NULL 0x80 #define TSDB_DATA_SMALLINT_NULL 0x8000 -#define TSDB_DATA_INT_NULL 0x80000000 +#define TSDB_DATA_INT_NULL 0x80000000L #define TSDB_DATA_BIGINT_NULL 0x8000000000000000L #define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN @@ -304,7 +304,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_MIN_VNODES 64 #define TSDB_MAX_VNODES 2048 #define TSDB_MIN_VNODES_PER_DB 2 -#define TSDB_MAX_VNODES_PER_DB 16 +#define TSDB_MAX_VNODES_PER_DB 32 #define TSDB_DNODE_ROLE_ANY 0 #define TSDB_DNODE_ROLE_MGMT 1 From 4b9a1813a5226f1097c53e6a7ab1e2a1a3fd5452 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Sep 2020 04:12:13 +0000 Subject: [PATCH 18/39] minor changs --- src/inc/taosdef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 2896e91d93..1667bb5de5 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -304,7 +304,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_MIN_VNODES 64 #define TSDB_MAX_VNODES 2048 #define TSDB_MIN_VNODES_PER_DB 2 -#define TSDB_MAX_VNODES_PER_DB 32 +#define TSDB_MAX_VNODES_PER_DB 64 #define TSDB_DNODE_ROLE_ANY 0 #define TSDB_DNODE_ROLE_MGMT 1 From 70e97fec8a3b4abffb8753e3c7d31cb5ae514a8c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Sep 2020 12:15:14 +0800 Subject: [PATCH 19/39] minor changes --- src/inc/taosdef.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 115630d1a5..343f8d2070 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -64,7 +64,7 @@ extern const int32_t TYPE_BYTES[11]; // TODO: replace and remove code below #define CHAR_BYTES sizeof(char) #define SHORT_BYTES sizeof(int16_t) -#define INT_BYTES sizeof(int) +#define INT_BYTES sizeof(int32_t) #define LONG_BYTES sizeof(int64_t) #define FLOAT_BYTES sizeof(float) #define DOUBLE_BYTES sizeof(double) @@ -73,7 +73,7 @@ extern const int32_t TYPE_BYTES[11]; #define TSDB_DATA_BOOL_NULL 0x02 #define TSDB_DATA_TINYINT_NULL 0x80 #define TSDB_DATA_SMALLINT_NULL 0x8000 -#define TSDB_DATA_INT_NULL 0x80000000 +#define TSDB_DATA_INT_NULL 0x80000000L #define TSDB_DATA_BIGINT_NULL 0x8000000000000000L #define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN From ff406d578a11b24bd1ebd8efabc0c3d06e524990 Mon Sep 17 00:00:00 2001 From: Yiqing Liu Date: Wed, 30 Sep 2020 15:24:22 +0800 Subject: [PATCH 20/39] create jenkinfiles --- Jenkinsfile | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 Jenkinsfile diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000000..53798c8db9 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,70 @@ +pipeline { + agent any + stages { + stage('build TDengine') { + steps { + sh '''cd ${WORKSPACE} +export TZ=Asia/Harbin +date +rm -rf ${WORKSPACE}/debug +mkdir debug +cd debug +cmake .. > /dev/null +make > /dev/null +cd ${WORKSPACE}/debug''' + } + } + + stage('test_tsim') { + parallel { + stage('test') { + steps { + sh '''cd ${WORKSPACE}/tests +#./test-all.sh smoke +sudo ./test-all.sh full''' + } + } + + stage('test_crash_gen') { + steps { + sh '''cd ${WORKSPACE}/tests/pytest +sudo ./crash_gen.sh -a -p -t 4 -s 2000''' + } + } + + stage('test_valgrind') { + steps { + sh '''cd ${WORKSPACE}/tests/pytest +sudo ./valgrind-test.sh 2>&1 > mem-error-out.log +grep \'start to execute\\|ERROR SUMMARY\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-mem-error-out.log + +for memError in `grep \'ERROR SUMMARY\' uniq-mem-error-out.log | awk \'{print $4}\'` +do + if [ -n "$memError" ]; then + if [ "$memError" -gt 12 ]; then + echo -e "${RED} ## Memory errors number valgrind reports is $memError.\\ + More than our threshold! ## ${NC}" + travis_terminate $memError + fi + fi +done + +grep \'start to execute\\|definitely lost:\' mem-error-out.log|grep -v \'grep\'|uniq|tee uniq-definitely-lost-out.log +for defiMemError in `grep \'definitely lost:\' uniq-definitely-lost-out.log | awk \'{print $7}\'` +do + if [ -n "$defiMemError" ]; then + if [ "$defiMemError" -gt 13 ]; then + echo -e "${RED} ## Memory errors number valgrind reports \\ + Definitely lost is $defiMemError. More than our threshold! ## ${NC}" + travis_terminate $defiMemError + fi + fi +done''' + } + } + + } + } + + } +} \ No newline at end of file From d442e2b43c000d813a760af6f045053d15d40ab7 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Wed, 30 Sep 2020 15:26:10 +0800 Subject: [PATCH 21/39] update markdown for ALTER USER PRIVILEGE --- documentation20/webdocs/markdowndocs/administrator-ch.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/documentation20/webdocs/markdowndocs/administrator-ch.md b/documentation20/webdocs/markdowndocs/administrator-ch.md index d1ad107db6..cc1beb1042 100644 --- a/documentation20/webdocs/markdowndocs/administrator-ch.md +++ b/documentation20/webdocs/markdowndocs/administrator-ch.md @@ -250,10 +250,10 @@ ALTER USER PASS <'password'>; 修改用户密码, 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角 ``` -ALTER USER PRIVILEDGE <'super'|'write'|'read'>; +ALTER USER PRIVILEDGE ; ``` -修改用户权限为:super/write/read。 为避免被转换为小写,密码需要用单引号引用,单引号为英文半角 +修改用户权限为:super/write/read,不需要添加单引号 ``` SHOW USERS; From 6437b7fac5b47bd46a1cfadb3eff8ef4545de730 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Sep 2020 15:30:31 +0800 Subject: [PATCH 22/39] TD-1530 --- src/dnode/src/dnodeMgmt.c | 9 ++++++++- tests/script/jenkins/basic.txt | 4 ++-- tests/script/unique/mnode/mgmt23.sim | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index c968246a68..8f87a45fb7 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -464,7 +464,14 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) { dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse); for (int i = 0; i < pEpSet->numOfEps; ++i) { pEpSet->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]) + dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]); + + if (!mnodeIsRunning()) { + if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) { + dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]); + sdbUpdateSync(); + } + } } tsDMnodeEpSet = *pEpSet; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 000d36c178..977ef452ab 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -303,8 +303,8 @@ cd ../../../debug; make ./test.sh -f unique/mnode/mgmt22.sim ./test.sh -f unique/mnode/mgmt23.sim ./test.sh -f unique/mnode/mgmt24.sim -#./test.sh -f unique/mnode/mgmt25.sim -#./test.sh -f unique/mnode/mgmt26.sim +./test.sh -f unique/mnode/mgmt25.sim +./test.sh -f unique/mnode/mgmt26.sim ./test.sh -f unique/mnode/mgmt33.sim ./test.sh -f unique/mnode/mgmt34.sim ./test.sh -f unique/mnode/mgmtr2.sim diff --git a/tests/script/unique/mnode/mgmt23.sim b/tests/script/unique/mnode/mgmt23.sim index 7e60ab908b..4851872860 100644 --- a/tests/script/unique/mnode/mgmt23.sim +++ b/tests/script/unique/mnode/mgmt23.sim @@ -65,7 +65,7 @@ endi print ============== step4 sql drop dnode $hostname2 -sleep 8000 +sleep 16000 sql show mnodes $dnode1Role = $data2_1 From a3a95b60c3fbfd9aecbfbb32be079ed8a6010028 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Sep 2020 21:58:05 +0800 Subject: [PATCH 23/39] [td-1603] add client side defined error code. --- tests/pytest/crash_gen.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 1ea19dfac3..fee355eef9 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -1516,6 +1516,8 @@ class Task(): if errno in [ 0x05, # TSDB_CODE_RPC_NOT_READY # 0x200, # invalid SQL, TODO: re-examine with TD-934 + 0x217, # "db not selected", client side defined error code + 0x218, # "Table does not exist" client side defined error code 0x360, 0x362, 0x369, # tag already exists 0x36A, 0x36B, 0x36D, From 1f0f57cf383ffa0050c58d18f05e658ede4fa72e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Sep 2020 23:50:46 +0800 Subject: [PATCH 24/39] [td-1283] --- src/client/src/tscServer.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ad13502e57..abef727040 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -147,12 +147,13 @@ void tscPrintMgmtEp() { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; if (pObj == NULL) return; + if (pObj != pObj->signature) { tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature); return; } - SSqlObj *pSql = pObj->pHb; + SSqlObj *pSql = tres; SSqlRes *pRes = &pSql->res; if (code == 0) { @@ -173,10 +174,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); } } else { - tscDebug("heart beat failed, code:%s", tstrerror(code)); + tscDebug("heartbeat failed, code:%s", tstrerror(code)); } - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + if (pObj->pHb != NULL) { + int64_t waitingDuring = tsShellActivityTimer * 500; + tscDebug("%p start heartbeat in %"PRId64"ms", pSql, waitingDuring); + + taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); + } else { + tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); + } } void tscProcessActivityTimer(void *handle, void *tmrId) { From 7bf8c4489d8f71784bd30894acade1d451af7a26 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Oct 2020 15:36:13 +0800 Subject: [PATCH 25/39] [td-225] fix invalid read of hb --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index abef727040..8a645d5761 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -261,6 +261,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pSql->cmd; assert(*pSql->self == pSql); + pSql->pRpcCtx = NULL; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); @@ -398,7 +399,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } rpcFreeCont(rpcMsg->pCont); - } int doProcessSql(SSqlObj *pSql) { From 0e5cc6e46ad71bd94577bf9fe9aae9fc66c49b0b Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:40:56 +0000 Subject: [PATCH 26/39] TD-1645 --- src/rpc/src/rpcTcp.c | 51 +++++++++++++++++++++++++++++--------------- src/rpc/src/rpcUdp.c | 12 ++++++++--- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2a3facdb36..97a3dad1eb 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -62,7 +62,7 @@ typedef struct { char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; - SThreadObj *pThreadObj; + SThreadObj **pThreadObj; pthread_t thread; } SServerObj; @@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); + pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); @@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); // initialize parameters in case it may encounter error later - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pServerObj->pThreadObj); + free(pServerObj); + return NULL; + } + + pServerObj->pThreadObj[i] = pThreadObj; pThreadObj->pollFd = -1; taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; - pThreadObj++; } // initialize mutex, thread, fd which may fail - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = pServerObj->pThreadObj[i]; code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); @@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pThreadObj->threadId = i; - pThreadObj++; } pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); @@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; + if (pThreadObj->thread == pthread_self()) { + pthread_detach(pthread_self()); + return; + } + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed @@ -184,14 +197,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); - if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); if (fd != -1) taosCloseSocket(fd); - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); - } } void taosStopTcpServer(void *handle) { @@ -210,7 +216,7 @@ void taosCleanUpTcpServer(void *handle) { if (pServerObj == NULL) return; for (int i = 0; i < pServerObj->numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj + i; + pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); pthread_mutex_destroy(&(pThreadObj->mutex)); } @@ -249,7 +255,7 @@ static void *taosAcceptTcpConnection(void *arg) { taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; + pThreadObj = pServerObj->pThreadObj[threadId]; SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); if (pFdObj) { @@ -329,8 +335,6 @@ void taosCleanUpTcpClient(void *chandle) { taosStopTcpThread(pThreadObj); tDebug ("%s TCP client is cleaned up", pThreadObj->label); - - taosTFree(pThreadObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -503,8 +507,21 @@ static void *taosProcessTcpData(void *param) { pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } + + if (pThreadObj->stop) break; } + if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); + } + + tDebug("%s TCP thread exits ...", pThreadObj->label); + taosTFree(pThreadObj); + return NULL; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4ea47582b9..250c4c38f2 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -140,15 +140,18 @@ void taosStopUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); + pConn->fd = -1; } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - pthread_join(pConn->thread, NULL); + if (pConn->thread == pthread_self()) { + pthread_detach(pthread_self()); + } else { + pthread_join(pConn->thread, NULL); + } } - taosTFree(pConn->buffer); - // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } tDebug("%s UDP is stopped", pSet->label); @@ -230,6 +233,9 @@ static void *taosRecvUdpData(void *param) { (*(pConn->processData))(&recvInfo); } + taosTFree(pConn->buffer); + tDebug("%s UDP recv thread exits", pConn->label); + return NULL; } From 30284cbe59f37a234c8ad61fe7c16f49c1648bf8 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:48:10 +0000 Subject: [PATCH 27/39] TD-1645 --- src/sync/src/taosTcpPool.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index eda822b1ec..539cfb64da 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { continue; } } + } + + if (pThread->stop) break; } uDebug("%p TCP epoll thread exits", pThread); From f949b732f5051ad95893c5ea994907362e9dcb5f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:55:00 +0000 Subject: [PATCH 28/39] TD-1645 --- src/rpc/src/rpcTcp.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 97a3dad1eb..2119824b3d 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -205,7 +205,14 @@ void taosStopTcpServer(void *handle) { if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); - if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL); + + if (taosCheckPthreadValid(pServerObj->thread)) { + if (pServerObj->thread == pthread_self()) { + pthread_detach(pthread_self()); + } else { + pthread_join(pServerObj->thread, NULL); + } + } tDebug("%s TCP server is stopped", pServerObj->label); } From ccc32feb221e237b84af621ebe07b665f9e7a43f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 08:44:33 +0800 Subject: [PATCH 29/39] revert from TD-1632 --- src/rpc/src/rpcMain.c | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a5d3b079a..f0b8c996c5 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,14 +885,13 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); - *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -946,17 +945,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); - if (terrno == 0) { - SRpcReqContext *pContext = pConn->pContext; - *ppContext = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } - } } } @@ -1021,8 +1009,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - SRpcReqContext *pContext; - pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); + pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1042,7 +1029,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead, pContext); + rpcProcessIncomingMsg(pConn, pHead); } } @@ -1073,7 +1060,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1102,10 +1089,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } } else { // it's a response + SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } else { rpcCloseConn(pConn); } From 8fed6ecaa12940f2ea840693b582d9a6ffe735a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Oct 2020 11:11:07 +0800 Subject: [PATCH 30/39] [td-225] fix compiler error on windows --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8a645d5761..e84217b38b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -178,7 +178,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { } if (pObj->pHb != NULL) { - int64_t waitingDuring = tsShellActivityTimer * 500; + int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p start heartbeat in %"PRId64"ms", pSql, waitingDuring); taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); From ef1dc64667eb711961d43a24bec0ae2c22b547f7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Oct 2020 11:12:46 +0800 Subject: [PATCH 31/39] [td-225] fix compiler error on windows --- src/client/src/tscServer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ad677c2194..d3c0201169 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -179,7 +179,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (pObj->pHb != NULL) { int32_t waitingDuring = tsShellActivityTimer * 500; - tscDebug("%p start heartbeat in %"PRId64"ms", pSql, waitingDuring); + tscDebug("%p start heartbeat in %dms", pSql, waitingDuring); taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); } else { From baf580ee8b21846676360b5feeca9a53cba0e75e Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 2 Oct 2020 12:59:15 +0800 Subject: [PATCH 32/39] TD-1632 revert --- src/rpc/src/rpcMain.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f0b8c996c5..6a5d3b079a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,13 +885,14 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); + *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -945,6 +946,17 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); + if (terrno == 0) { + SRpcReqContext *pContext = pConn->pContext; + *ppContext = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; + + // for UDP, port may be changed by server, the port in epSet shall be used for cache + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } + } } } @@ -1009,7 +1021,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - pConn = rpcProcessMsgHead(pRpc, pRecv); + SRpcReqContext *pContext; + pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1029,7 +1042,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead); + rpcProcessIncomingMsg(pConn, pHead, pContext); } } @@ -1060,7 +1073,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1089,15 +1102,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } } else { // it's a response - SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } else { + if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { rpcCloseConn(pConn); } From 38dc4f4d161f23653a2e69b199637a6f8f4723c9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 13:31:15 +0800 Subject: [PATCH 33/39] TD-1645 fix compile error in windotws --- src/rpc/src/rpcTcp.c | 4 ++-- src/rpc/src/rpcUdp.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2119824b3d..aacdede543 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -174,7 +174,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; - if (pThreadObj->thread == pthread_self()) { + if (taosComparePthread(pThreadObj->thread, pthread_self())) { pthread_detach(pthread_self()); return; } @@ -207,7 +207,7 @@ void taosStopTcpServer(void *handle) { if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if (taosCheckPthreadValid(pServerObj->thread)) { - if (pServerObj->thread == pthread_self()) { + if (taosComparePthread(pServerObj->thread, pthread_self())) { pthread_detach(pthread_self()); } else { pthread_join(pServerObj->thread, NULL); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 250c4c38f2..4fd0318ae6 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -146,7 +146,7 @@ void taosStopUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - if (pConn->thread == pthread_self()) { + if (taosComparePthread(pConn->thread, pthread_self())) { pthread_detach(pthread_self()); } else { pthread_join(pConn->thread, NULL); From c522e902fae8a7afe6d328731a008cab57fb04da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 05:40:37 +0000 Subject: [PATCH 34/39] scripts --- tests/script/general/http/restful_full.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/general/http/restful_full.sim b/tests/script/general/http/restful_full.sim index a02140a419..8d2f1a7c00 100644 --- a/tests/script/general/http/restful_full.sim +++ b/tests/script/general/http/restful_full.sim @@ -119,7 +119,7 @@ endi system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql print 17-> $system_content -if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then +if $system_content != @{"status":"error","code":534,"desc":"Syntax error in SQL"}@ then return -1 endi From a8b8ecb20e38e23a59b99924e0c701adda458084 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 2 Oct 2020 14:56:54 +0000 Subject: [PATCH 35/39] TD-1645 --- src/connector/go | 2 +- src/rpc/src/rpcTcp.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/go b/src/connector/go index 567b7b12f3..8c58c512b6 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index aacdede543..46555b3647 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -196,7 +196,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) pthread_join(pThreadObj->thread, NULL); if (fd != -1) taosCloseSocket(fd); } From 09d2c5c31b9816b9f3a70b5e69024a9fb5abba60 Mon Sep 17 00:00:00 2001 From: Bison 'goldeagle' Fan Date: Sat, 3 Oct 2020 10:57:24 +0800 Subject: [PATCH 36/39] upd: README.md with submodule contents of go & grafana connectors --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 522fc0ebc1..36436dd549 100644 --- a/README.md +++ b/README.md @@ -83,12 +83,18 @@ sudo dnf install -y maven ## Get the source codes -- github: +First of all, you may clone the source codes from github: ```bash git clone https://github.com/taosdata/TDengine.git cd TDengine ``` +The connectors for go & grafana have been moved to separated repositories, +so you should run this command in the TDengine directory to install them: +```bash +git submodule update --init --recursive +``` + ## Build TDengine ### On Linux platform From c608fba88ed0c69d19551bb64de77994f8d971e3 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 3 Oct 2020 04:30:02 +0000 Subject: [PATCH 37/39] TD-1645 --- src/rpc/src/rpcTcp.c | 7 +++++-- src/rpc/src/rpcUdp.c | 12 +++--------- src/sync/src/taosTcpPool.c | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 46555b3647..06b5d39d61 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -196,7 +196,10 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) pthread_join(pThreadObj->thread, NULL); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { + pthread_join(pThreadObj->thread, NULL); + } + if (fd != -1) taosCloseSocket(fd); } @@ -225,7 +228,6 @@ void taosCleanUpTcpServer(void *handle) { for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); - pthread_mutex_destroy(&(pThreadObj->mutex)); } tDebug("%s TCP server is cleaned up", pServerObj->label); @@ -526,6 +528,7 @@ static void *taosProcessTcpData(void *param) { taosFreeFdObj(pFdObj); } + pthread_mutex_destroy(&(pThreadObj->mutex)); tDebug("%s TCP thread exits ...", pThreadObj->label); taosTFree(pThreadObj); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4fd0318ae6..4ea47582b9 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -140,18 +140,15 @@ void taosStopUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); - pConn->fd = -1; } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - if (taosComparePthread(pConn->thread, pthread_self())) { - pthread_detach(pthread_self()); - } else { - pthread_join(pConn->thread, NULL); - } + pthread_join(pConn->thread, NULL); } + taosTFree(pConn->buffer); + // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } tDebug("%s UDP is stopped", pSet->label); @@ -233,9 +230,6 @@ static void *taosRecvUdpData(void *param) { (*(pConn->processData))(&recvInfo); } - taosTFree(pConn->buffer); - tDebug("%s UDP recv thread exits", pConn->label); - return NULL; } diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index 539cfb64da..6a210a136f 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -324,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { } pthread_join(thread, NULL); - taosClose(fd); + if (fd >= 0) taosClose(fd); } From 7367b31275d0ad6930725bf21c0707807f7ef3e3 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 6 Oct 2020 06:20:07 +0000 Subject: [PATCH 38/39] TD-1645 --- src/rpc/src/rpcTcp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 06b5d39d61..20b03b34e3 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -342,8 +342,8 @@ void taosCleanUpTcpClient(void *chandle) { SThreadObj *pThreadObj = chandle; if (pThreadObj == NULL) return; + tDebug ("%s TCP client will be cleaned up", pThreadObj->label); taosStopTcpThread(pThreadObj); - tDebug ("%s TCP client is cleaned up", pThreadObj->label); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -378,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle) { SFdObj *pFdObj = chandle; - if (pFdObj == NULL) return; + if (pFdObj == NULL || pFdObj->signature != pFdObj) return; SThreadObj *pThreadObj = pFdObj->pThreadObj; tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); @@ -391,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { SFdObj *pFdObj = chandle; - if (chandle == NULL) return -1; + if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; return taosWriteMsg(pFdObj->fd, data, len); } From 12687355c07b28c21e4ef628482a1a26858ac351 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 6 Oct 2020 11:46:33 +0000 Subject: [PATCH 39/39] TD-1632 --- src/connector/go | 2 +- src/rpc/src/rpcMain.c | 40 +++++++++++++++++++--------------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/connector/go b/src/connector/go index 567b7b12f3..8c58c512b6 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a5d3b079a..b86b95b858 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -881,6 +881,20 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; + SRpcReqContext *pContext = pConn->pContext; + + if (pHead->code == TSDB_CODE_RPC_REDIRECT) { + if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { + // if EpSet is not included in the msg, treat it as NOT_READY + pHead->code = TSDB_CODE_RPC_NOT_READY; + } else { + pContext->redirect++; + if (pContext->redirect > TSDB_MAX_REPLICA) { + pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + tWarn("%s, too many redirects, quit", pConn->info); + } + } + } return TSDB_CODE_SUCCESS; } @@ -950,12 +964,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont SRpcReqContext *pContext = pConn->pContext; *ppContext = pContext; pConn->pContext = NULL; - pConn->pReqMsg = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } } } } @@ -1083,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; - rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { + rpcMsg.ahandle = pConn->ahandle; if (rpcMsg.contLen > 0) { rpcMsg.handle = pConn; rpcAddRef(pRpc); // add the refCount for requests @@ -1103,25 +1111,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } else { // it's a response rpcMsg.handle = pContext; + rpcMsg.ahandle = pContext->ahandle; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } else { rpcCloseConn(pConn); } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - if (rpcMsg.contLen < sizeof(SRpcEpSet)) { - // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; - } else { - pContext->redirect++; - if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - tWarn("%s, too many redirects, quit", pConn->info); - } - } - } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;