From 62127398993168ee33b2694e6c83c668b0cddf85 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Dec 2020 22:49:59 +0800 Subject: [PATCH 01/12] [TD-2378]: optimize the client memory requirement. --- src/client/inc/tsclient.h | 3 +- src/client/src/tscFunctionImpl.c | 1 - src/client/src/tscParseInsert.c | 2 +- src/client/src/tscSQLParser.c | 2 +- src/client/src/tscSchemaUtil.c | 30 +++--- src/client/src/tscServer.c | 91 ++++++------------ src/client/src/tscUtil.c | 46 +++------ src/query/inc/qUtil.h | 4 - src/query/src/qExecutor.c | 7 +- src/query/src/qUtil.c | 154 ------------------------------- 10 files changed, 59 insertions(+), 281 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 33ea06ba9c..0f3b4b7566 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -69,9 +69,10 @@ typedef struct STableMeta { int16_t sversion; int16_t tversion; char sTableId[TSDB_TABLE_FNAME_LEN]; - SVgroupInfo vgroupInfo; + int32_t vgId; SCorVgroupInfo corVgroupInfo; STableId id; +// union {int64_t stableUid; SSchema* schema;}; SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info } STableMeta; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 6afc5ba223..22240efe14 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); - printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems); return true; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index ded12f64ea..b43fe1fcd7 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -731,7 +731,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI return code; } - dataBuf->vgId = pTableMeta->vgroupInfo.vgId; + dataBuf->vgId = pTableMeta->vgId; dataBuf->numOfTables = 1; *totalNum += numOfRows; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f1acea71c5..4dad65fe9e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4936,7 +4936,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { } SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; - pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); + pUpdateMsg->head.vgId = htonl(pTableMeta->vgId); pUpdateMsg->tid = htonl(pTableMeta->id.tid); pUpdateMsg->uid = htobe64(pTableMeta->id.uid); pUpdateMsg->colId = htons(pTagsSchema->colId); diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index fcc93ffadc..d77bb9990c 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -130,13 +130,14 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) { return NULL; } -static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupInfo *vgroupInfo) { +static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupMsg *pVgroupMsg) { corVgroupInfo->version = 0; - corVgroupInfo->inUse = 0; - corVgroupInfo->numOfEps = vgroupInfo->numOfEps; - for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { - corVgroupInfo->epAddr[i].fqdn = strdup(vgroupInfo->epAddr[i].fqdn); - corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; + corVgroupInfo->inUse = 0; + corVgroupInfo->numOfEps = pVgroupMsg->numOfEps; + + for (int32_t i = 0; i < pVgroupMsg->numOfEps; i++) { + corVgroupInfo->epAddr[i].fqdn = strndup(pVgroupMsg->epAddr[i].fqdn, tListLen(pVgroupMsg->epAddr[0].fqdn)); + corVgroupInfo->epAddr[i].port = pVgroupMsg->epAddr[i].port; } } @@ -145,8 +146,10 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); + pTableMeta->tableType = pTableMetaMsg->tableType; - + pTableMeta->vgId = pTableMetaMsg->vgroup.vgId; + pTableMeta->tableInfo = (STableComInfo) { .numOfTags = pTableMetaMsg->numOfTags, .precision = pTableMetaMsg->precision, @@ -156,18 +159,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->id.tid = pTableMetaMsg->tid; pTableMeta->id.uid = pTableMetaMsg->uid; - SVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo; - pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps; - pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId; - - for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { - SEpAddrMsg* pEpMsg = &pTableMetaMsg->vgroup.epAddr[i]; - - pVgroupInfo->epAddr[i].fqdn = strndup(pEpMsg->fqdn, tListLen(pEpMsg->fqdn)); - pVgroupInfo->epAddr[i].port = pEpMsg->port; - } - - tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, pVgroupInfo); + tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMetaMsg->vgroup); pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ded04388f4..958ae28427 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -45,32 +45,30 @@ static int32_t getWaitingTimeInterval(int32_t count) { return 0; } - return initial * (2<<(count - 2)); + return initial * ((2u)<<(count - 2)); } -static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { - assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); - - SRpcEpSet* pEpSet = &pSql->epSet; +static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { + assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. // change the inUse property would not affect the isUse attribute of STableMeta pEpSet->inUse = rand() % pVgroupInfo->numOfEps; // apply the FQDN string length check here - bool hasFqdn = false; + bool existed = false; pEpSet->numOfEps = pVgroupInfo->numOfEps; for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { - tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); pEpSet->port[i] = pVgroupInfo->epAddr[i].port; - if (!hasFqdn) { - hasFqdn = (strlen(pEpSet->fqdn[i]) > 0); + int32_t len = (int32_t) strnlen(pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + if (len > 0) { + tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); + existed = true; } } - - assert(hasFqdn); + assert(existed); } static void tscDumpMgmtEpSet(SSqlObj *pSql) { @@ -102,7 +100,8 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { pCorEpSet->epSet = *pEpSet; taosCorEndWrite(&pCorEpSet->version); } -static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { + +static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SCorVgroupInfo *pVgroupInfo) { if (pVgroupInfo == NULL) { return;} taosCorBeginRead(&pVgroupInfo->version); int8_t inUse = pVgroupInfo->inUse; @@ -515,8 +514,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); - tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); + pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId); } pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); @@ -535,7 +534,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // NOTE: shell message size should not include SMsgDesc int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); - int32_t vgId = pTableMeta->vgroupInfo.vgId; SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; pMsgDesc->numOfVnodes = htonl(1); // always one vnode @@ -543,7 +541,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(SMsgDesc); SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; - pShellMsg->header.vgId = htonl(vgId); + pShellMsg->header.vgId = htonl(pTableMeta->vgId); pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc pShellMsg->length = pShellMsg->header.contLen; @@ -551,9 +549,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo); - tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, + tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit, pSql->epSet.numOfEps); return TSDB_CODE_SUCCESS; } @@ -597,24 +595,28 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { - SVgroupInfo* pVgroupInfo = NULL; + int32_t vgId = -1; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); - + + SVgroupInfo* pVgroupInfo = NULL; if (pTableMetaInfo->vgroupList->numOfVgroups > 0) { assert(index < pTableMetaInfo->vgroupList->numOfVgroups); pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; } + + vgId = pVgroupInfo->vgId; + tscSetDnodeEpSet(&pSql->epSet, pVgroupInfo); tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups); } else { - pVgroupInfo = &pTableMeta->vgroupInfo; + vgId = pTableMeta->vgId; + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo); } - assert(pVgroupInfo != NULL); + pSql->epSet.inUse = rand()%pSql->epSet.numOfEps; - tscSetDnodeEpSet(pSql, pVgroupInfo); - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); + pQueryMsg->head.vgId = htonl(vgId); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->id.tid); @@ -633,7 +635,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); + tscSetDnodeEpSet(&pSql->epSet, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList); @@ -1448,48 +1450,11 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet); + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMetaInfo->pTableMeta->corVgroupInfo); return TSDB_CODE_SUCCESS; } -//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { -// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload; -// pCancelMsg->qhandle = htobe64(pSql->res.qhandle); -// -// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// -// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { -// int32_t vgIndex = pTableMetaInfo->vgroupIndex; -// if (pTableMetaInfo->pVgroupTables == NULL) { -// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; -// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); -// -// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); -// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); -// } else { -// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); -// assert(vgIndex >= 0 && vgIndex < numOfVgroups); -// -// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); -// -// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); -// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); -// } -// } else { -// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; -// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); -// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); -// } -// -// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg); -// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY; -// -// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); -// return TSDB_CODE_SUCCESS; -//} - int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SAlterDbMsg); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2991e89581..e7e0134f36 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -466,13 +466,6 @@ void tscFreeRegisteredSqlObj(void *pSql) { void tscFreeTableMetaHelper(void *pTableMeta) { STableMeta* p = (STableMeta*) pTableMeta; - int32_t numOfEps = p->vgroupInfo.numOfEps; - assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA); - - for(int32_t i = 0; i < numOfEps; ++i) { - tfree(p->vgroupInfo.epAddr[i].fqdn); - } - int32_t numOfEps1 = p->corVgroupInfo.numOfEps; assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA); @@ -1941,30 +1934,24 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm return NULL; } - pNew->fp = fp; + pNew->fp = fp; pNew->fetchFp = fp; - pNew->param = param; + pNew->param = param; + pNew->sqlstr = NULL; pNew->maxRetry = TSDB_MAX_REPLICA; - pNew->sqlstr = strdup(pSql->sqlstr); - if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed", pSql); - tscFreeSqlObj(pNew); - return NULL; - } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, 0); assert(pSql->cmd.clauseIndex == 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); - registerSqlObj(pNew); + return pNew; } -static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) { +static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t uid) { int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo); if (numOfOutput == 0) { return; @@ -2017,15 +2004,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex); - pNew->pTscObj = pSql->pTscObj; + pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - - pNew->sqlstr = strdup(pSql->sqlstr); - if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; - } + pNew->sqlstr = NULL; SSqlCmd* pnCmd = &pNew->cmd; memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); @@ -2113,23 +2094,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void goto _error; } - doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid); + doSetSqlExprAndResultFieldInfo(pNewQueryInfo, uid); - pNew->fp = fp; + pNew->fp = fp; pNew->fetchFp = fp; - - pNew->param = param; + pNew->param = param; pNew->maxRetry = TSDB_MAX_REPLICA; char* name = pTableMetaInfo->name; STableMetaInfo* pFinalInfo = NULL; - if (pPrevSql == NULL) { - STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup + if (pPrevSql == NULL) { // get by name may failed due to the cache cleanup + STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); assert(pTableMeta != NULL); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, - pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); + pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 974d93f89c..4620e3d61e 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -34,17 +34,13 @@ int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); -void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num); -void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo); int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo); -void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); int32_t initResultRow(SResultRow *pResultRow); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); -void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2b992a931d..64eba616ad 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5182,10 +5182,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { scanMultiTableDataBlocks(pQInfo); pQInfo->groupIndex += 1; - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + taosArrayDestroy(s); // no results generated for current group, continue to try the next group - taosArrayDestroy(s); + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pWindowResInfo->size <= 0) { continue; } @@ -5212,8 +5212,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); - - clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo); + resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); break; } } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTSCompQuery(pQuery)) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index d09993ae4e..dc01de0f92 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -20,18 +20,6 @@ #include "qExecutor.h" #include "qUtil.h" -static int32_t getResultRowKeyInfo(SResultRow* pResult, int16_t type, char** key, int16_t* bytes) { - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - *key = varDataVal(pResult->key); - *bytes = varDataLen(pResult->key); - } else { - *key = (char*) &pResult->win.skey; - *bytes = tDataTypeDesc[type].nSize; - } - - return 0; -} - int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t size = 0; @@ -99,73 +87,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } -void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) { - if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { - return; - } - - int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); - assert(num >= 0 && num <= numOfClosed); - - int16_t type = pResultRowInfo->type; - int64_t uid = 0; - - char *key = NULL; - int16_t bytes = -1; - - for (int32_t i = 0; i < num; ++i) { - SResultRow *pResult = pResultRowInfo->pResult[i]; - if (pResult->closed) { // remove the window slot from hash table - getResultRowKeyInfo(pResult, type, &key, &bytes); - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); - taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - } else { - break; - } - } - - int32_t remain = pResultRowInfo->size - num; - - // clear all the closed windows from the window list - for (int32_t k = 0; k < remain; ++k) { - copyResultRow(pRuntimeEnv, pResultRowInfo->pResult[k], pResultRowInfo->pResult[num + k], type); - } - - // move the unclosed window in the front of the window list - for (int32_t k = remain; k < pResultRowInfo->size; ++k) { - SResultRow *pWindowRes = pResultRowInfo->pResult[k]; - clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type); - } - - pResultRowInfo->size = remain; - - for (int32_t k = 0; k < pResultRowInfo->size; ++k) { - SResultRow *pResult = pResultRowInfo->pResult[k]; - getResultRowKeyInfo(pResult, type, &key, &bytes); - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); - - int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - assert(p != NULL); - - int32_t v = (*p - num); - assert(v >= 0 && v <= pResultRowInfo->size); - - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); - taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); - } - - pResultRowInfo->curIndex = -1; -} - -void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { - if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { - return; - } - - int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed); -} - int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { int32_t i = 0; while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { @@ -188,40 +109,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { } } -/* - * remove the results that are not the FIRST time window that spreads beyond the - * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. - * NOTE: remove redundant, only when the result set order equals to traverse order - */ -void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { - assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); - if (pResultRowInfo->size <= 1) { - return; - } - - // get the result order - int32_t resultOrder = (pResultRowInfo->pResult[0]->win.skey < pResultRowInfo->pResult[1]->win.skey)? 1:-1; - if (order != resultOrder) { - return; - } - - int32_t i = 0; - if (order == QUERY_ASC_FORWARD_STEP) { - TSKEY ekey = pResultRowInfo->pResult[i]->win.ekey; - while (i < pResultRowInfo->size && (ekey < lastKey)) { - ++i; - } - } else if (order == QUERY_DESC_FORWARD_STEP) { - while (i < pResultRowInfo->size && (pResultRowInfo->pResult[i]->win.skey > lastKey)) { - ++i; - } - } - - if (i < pResultRowInfo->size) { - pResultRowInfo->size = (i + 1); - } -} - bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { return (getResultRow(pResultRowInfo, slot)->closed == true); } @@ -262,47 +149,6 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 } } -/** - * The source window result pos attribution of the source window result does not assign to the destination, - * since the attribute of "Pos" is bound to each window result when the window result is created in the - * disk-based result buffer. - */ -void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResultRow *src, int16_t type) { - dst->numOfRows = src->numOfRows; - - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - dst->key = realloc(dst->key, varDataTLen(src->key)); - varDataCopy(dst->key, src->key); - } else { - dst->win = src->win; - } - dst->closed = src->closed; - - int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput; - - for (int32_t i = 0; i < nOutputCols; ++i) { - SResultRowCellInfo *pDst = getResultCell(pRuntimeEnv, dst, i); - SResultRowCellInfo *pSrc = getResultCell(pRuntimeEnv, src, i); - -// char *buf = pDst->interResultBuf; - memcpy(pDst, pSrc, sizeof(SResultRowCellInfo) + pRuntimeEnv->pCtx[i].interBufBytes); -// pDst->interResultBuf = buf; // restore the allocated buffer - - // copy the result info struct -// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes); - - // copy the output buffer data from src to dst, the position info keep unchanged - tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId); - char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage); - - tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId); - char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage); - size_t s = pRuntimeEnv->pQuery->pExpr1[i].bytes; - - memcpy(dstBuf, srcBuf, s); - } -} - SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) { assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput); return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); From 001d28f278fd899875343b20d4d3a79fc51be130 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Dec 2020 22:55:32 +0800 Subject: [PATCH 02/12] [TD-225] refactor codes. --- src/client/inc/tscLocalMerge.h | 1 - src/client/src/tscLocalMerge.c | 27 +-------------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 43ba31f331..eaaf2f1ac6 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -56,7 +56,6 @@ typedef struct SLocalReducer { tFilePage * pTempBuffer; struct SQLFunctionCtx *pCtx; int32_t rowSize; // size of each intermediate result. - int32_t status; // denote it is in reduce process, in reduce process, it bool hasPrevRow; // cannot be released bool hasUnprocessedRow; tOrderDescriptor * pDesc; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index da763b9b0c..14e8b0084e 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -493,13 +493,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { // there is no more result, so we release all allocated resource SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); if (pLocalReducer != NULL) { - int32_t status = 0; - while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, - TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) { - taosMsleep(100); - tscDebug("%p waiting for delete procedure, status: %d", pSql, status); - } - pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); if (pLocalReducer->pCtx != NULL) { @@ -1437,24 +1430,13 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { SLocalReducer *pLocalReducer = pRes->pLocalReducer; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - // set the data merge in progress - int32_t prevStatus = - atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS); - if (prevStatus != TSC_LOCALREDUCE_READY) { - assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already - return TSDB_CODE_SUCCESS; - } - - tFilePage *tmpBuffer = pLocalReducer->pTempBuffer; + tFilePage *tmpBuffer = pLocalReducer->pTempBuffer; if (doHandleLastRemainData(pSql)) { - pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result. return TSDB_CODE_SUCCESS; } if (doBuildFilledResultForGroup(pSql)) { - pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result. return TSDB_CODE_SUCCESS; } @@ -1510,7 +1492,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { pLocalReducer->discardData->num = 0; if (saveGroupResultInfo(pSql)) { - pLocalReducer->status = TSC_LOCALREDUCE_READY; return TSDB_CODE_SUCCESS; } @@ -1556,7 +1537,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { // here we do not check the return value adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree); - assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS); if (pRes->numOfRows == 0) { handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer); @@ -1567,7 +1547,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { * If previous group is not skipped, keep it in pRes->numOfGroups */ if (notSkipped && saveGroupResultInfo(pSql)) { - pLocalReducer->status = TSC_LOCALREDUCE_READY; return TSDB_CODE_SUCCESS; } @@ -1587,7 +1566,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { if (pRes->numOfRows == 0) { continue; } else { - pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result. return TSDB_CODE_SUCCESS; } } else { // result buffer is not full @@ -1612,9 +1590,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { genFinalResults(pSql, pLocalReducer, true); } - assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0); - pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result. - return TSDB_CODE_SUCCESS; } From 78c3ec8770d494b696f215896073f4cc8f649ce1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Dec 2020 15:07:13 +0800 Subject: [PATCH 03/12] [TD-225] refactor: 1. reduce hash node size. 2. function remained. --- src/client/inc/tscLocalMerge.h | 6 ------ src/client/inc/tsclient.h | 17 +++++++++-------- src/client/src/tscSystem.c | 15 +++++++-------- src/client/src/tscUtil.c | 11 ++++++++++- src/util/inc/hash.h | 12 ++++++------ src/util/src/hash.c | 4 ++-- src/util/src/tcache.c | 2 +- src/vnode/src/vnodeMgmt.c | 2 +- 8 files changed, 36 insertions(+), 33 deletions(-) diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index eaaf2f1ac6..0617645188 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -38,12 +38,6 @@ typedef struct SLocalDataSource { tFilePage filePage; } SLocalDataSource; -enum { - TSC_LOCALREDUCE_READY = 0x0, - TSC_LOCALREDUCE_IN_PROGRESS = 0x1, - TSC_LOCALREDUCE_TOBE_FREED = 0x2, -}; - typedef struct SLocalReducer { SLocalDataSource ** pLocalDataSrc; int32_t numOfBuffer; diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 0f3b4b7566..ade8449b0f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -308,6 +308,7 @@ typedef struct STscObj { SRpcCorEpSet *tscCorMgmtEpSet; void* pDnodeConn; pthread_mutex_t mutex; + int32_t numOfObj; // number of sqlObj from this tscObj } STscObj; typedef struct SSubqueryState { @@ -478,14 +479,14 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField } } -extern SCacheObj* tscMetaCache; -extern int tscObjRef; -extern void * tscTmr; -extern void * tscQhandle; -extern int tscKeepConn[]; -extern int tscNumOfThreads; -extern int tscRefId; - +extern SCacheObj *tscMetaCache; + +extern int tscObjRef; +extern void *tscTmr; +extern void *tscQhandle; +extern int tscKeepConn[]; +extern int tscRefId; +extern int tscNumOfObj; // number of existed sqlObj in current process. extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index d98ab2facc..35ee916d7d 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -31,17 +31,16 @@ #include "tlocale.h" // global, not configurable -SCacheObj* tscMetaCache; +SCacheObj *tscMetaCache; // table meta cache +SHashObj *tscHashMap; // hash map to keep the global vgroup info int tscObjRef = -1; -void * tscTmr; -void * tscQhandle; -void * tscCheckDiskUsageTmr; +void *tscTmr; +void *tscQhandle; +void *tscCheckDiskUsageTmr; int tscRefId = -1; - -int tscNumOfThreads; +int tscNumOfObj = 0; // number of sqlObj in current process. static pthread_once_t tscinit = PTHREAD_ONCE_INIT; -//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet); void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); @@ -114,7 +113,7 @@ void taos_init_imp(void) { int queueSize = tsMaxConnections*2; double factor = (tscEmbedded == 0)? 2.0:4.0; - tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); + int32_t tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); if (tscNumOfThreads < 2) { tscNumOfThreads = 2; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e7e0134f36..bbad7bf257 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -458,9 +458,14 @@ void tscFreeRegisteredSqlObj(void *pSql) { SSqlObj* p = *(SSqlObj**)pSql; STscObj* pTscObj = p->pTscObj; - assert(p->self != 0); + assert(RID_VALID(p->self)); + tscFreeSqlObj(p); taosReleaseRef(tscRefId, pTscObj->rid); + + int32_t num = atomic_sub_fetch_32(&pTscObj->numOfObj, 1); + int32_t total = atomic_sub_fetch_32(&tscNumOfObj, 1); + tscDebug("%p free SqlObj, total in tscObj:%d, total:%d", pSql, num, total); } void tscFreeTableMetaHelper(void *pTableMeta) { @@ -1905,6 +1910,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { void registerSqlObj(SSqlObj* pSql) { taosAcquireRef(tscRefId, pSql->pTscObj->rid); pSql->self = taosAddRef(tscObjRef, pSql); + + int32_t num = atomic_add_fetch_32(&pSql->pTscObj->numOfObj, 1); + int32_t total = atomic_add_fetch_32(&tscNumOfObj, 1); + tscDebug("%p new SqlObj, total in tscObj:%d, total:%d", pSql, num, total); } SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index 5bada93d1c..6a97a74c04 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -32,11 +32,11 @@ typedef void (*_hash_free_fn_t)(void *param); typedef struct SHashNode { struct SHashNode *next; - uint32_t hashVal; // the hash value of key - uint32_t keyLen; // length of the key - size_t dataLen; // length of data - int8_t count; // reference count - int8_t removed; // flag to indicate removed + uint32_t hashVal; // the hash value of key + uint32_t dataLen; // length of data + uint32_t keyLen; // length of the key + int8_t removed; // flag to indicate removed + int8_t count; // reference count char data[]; } SHashNode; @@ -115,7 +115,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); * @param dsize * @return */ -void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize); /** * remove item with the specified key diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 7a835e87e7..d4163cb9ea 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -271,10 +271,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashGetCB(pHashObj, key, keyLen, NULL, NULL, 0); + return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL, 0); } -void* taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d, size_t dsize) { if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) { return NULL; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 379221e352..3afdf41d05 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -278,7 +278,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen } SCacheDataNode* ptNode = NULL; - taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*)); + taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode, sizeof(void*)); void* pData = (ptNode != NULL)? ptNode->data:NULL; diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 5cae7b7606..196e488210 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -91,7 +91,7 @@ static void vnodeIncRef(void *ptNode) { void *vnodeAcquire(int32_t vgId) { SVnodeObj **ppVnode = NULL; if (tsVnodesHash != NULL) { - ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + ppVnode = taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); } if (ppVnode == NULL || *ppVnode == NULL) { From 0c607c46ce3720964c0c713087bba34212a65da4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Dec 2020 17:15:37 +0800 Subject: [PATCH 04/12] [TD-225] fix compiler error. --- src/util/src/hash.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/hash.c b/src/util/src/hash.c index d4163cb9ea..2af25a7b3a 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -654,7 +654,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s pNewNode->keyLen = (uint32_t)keyLen; pNewNode->hashVal = hashVal; - pNewNode->dataLen = dsize; + pNewNode->dataLen = (uint32_t) dsize; pNewNode->count = 1; memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); From 8748df275d446451fc59cf0e81f1a5d7e851b9ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Dec 2020 17:34:05 +0800 Subject: [PATCH 05/12] [TD-225]refactor. --- src/client/src/tscSystem.c | 3 ++- src/client/src/tscUtil.c | 2 +- src/util/inc/hash.h | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 35ee916d7d..ff605dad72 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -132,7 +132,8 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); - tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); + tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj); + tscHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); } tscRefId = taosOpenRef(200, tscCloseTscObj); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index bbad7bf257..172887c110 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1913,7 +1913,7 @@ void registerSqlObj(SSqlObj* pSql) { int32_t num = atomic_add_fetch_32(&pSql->pTscObj->numOfObj, 1); int32_t total = atomic_add_fetch_32(&tscNumOfObj, 1); - tscDebug("%p new SqlObj, total in tscObj:%d, total:%d", pSql, num, total); + tscDebug("%p new SqlObj from %p, total in tscObj:%d, total:%d", pSql, pSql->pTscObj, num, total); } SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index 6a97a74c04..7ec4e5445a 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -43,6 +43,7 @@ typedef struct SHashNode { #define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen) #define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) #define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode)); + typedef enum SHashLockTypeE { HASH_NO_LOCK = 0, HASH_ENTRY_LOCK = 1, From f8adfac4141cbcaed01b0878fbe6c12c4c0830c8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 26 Dec 2020 13:51:28 +0800 Subject: [PATCH 06/12] adjust log --- src/client/inc/tscLog.h | 4 +- src/common/inc/tglobal.h | 57 +++++++++---------- src/common/inc/tulog.h | 2 +- src/common/src/tglobal.c | 107 ++++++++++++++++++++---------------- src/dnode/src/dnodeVnodes.c | 10 +++- src/inc/taosmsg.h | 6 +- src/mnode/inc/mnodeDnode.h | 3 + src/mnode/src/mnodeDnode.c | 21 +++++-- src/query/inc/queryLog.h | 4 +- src/rpc/inc/rpcLog.h | 2 +- src/tsdb/inc/tsdbMain.h | 2 +- src/util/inc/tconfig.h | 1 + src/util/src/tconfig.c | 20 +++++++ src/util/src/tlog.c | 2 +- src/util/src/ttimer.c | 2 +- 15 files changed, 150 insertions(+), 93 deletions(-) diff --git a/src/client/inc/tscLog.h b/src/client/inc/tscLog.h index 5bbf05aa5f..5273a87ea0 100644 --- a/src/client/inc/tscLog.h +++ b/src/client/inc/tscLog.h @@ -22,8 +22,8 @@ extern "C" { #include "tlog.h" -extern uint32_t cDebugFlag; -extern uint32_t tscEmbedded; +extern int32_t cDebugFlag; +extern int8_t tscEmbedded; #define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) #define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index d7fd9c1711..dbd8ae5b33 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -32,8 +32,8 @@ extern uint16_t tsSyncPort; extern uint16_t tsArbitratorPort; extern int32_t tsStatusInterval; extern int32_t tsNumOfMnodes; -extern int32_t tsEnableVnodeBak; -extern int32_t tsEnableTelemetryReporting; +extern int8_t tsEnableVnodeBak; +extern int8_t tsEnableTelemetryReporting; extern char tsEmail[]; extern char tsArbitrator[]; @@ -51,7 +51,7 @@ extern int8_t tsDaylight; extern char tsTimezone[]; extern char tsLocale[]; extern char tsCharset[]; // default encode string -extern int32_t tsEnableCoreFile; +extern int8_t tsEnableCoreFile; extern int32_t tsCompressMsgSize; extern char tsTempDir[]; @@ -59,12 +59,12 @@ extern char tsTempDir[]; extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing extern int32_t tsRetrieveBlockingModel;// retrieve threads will be blocked -extern int32_t tsKeepOriginalColumnName; +extern int8_t tsKeepOriginalColumnName; // client extern int32_t tsTableMetaKeepTimer; extern int32_t tsMaxSQLStringLen; -extern int32_t tsTscEnableRecordSql; +extern int8_t tsTscEnableRecordSql; extern int32_t tsMaxNumOfOrderedResults; extern int32_t tsMinSlidingTime; extern int32_t tsMinIntervalTime; @@ -93,49 +93,50 @@ extern int16_t tsWAL; extern int32_t tsFsyncPeriod; extern int32_t tsReplications; extern int32_t tsQuorum; -extern int32_t tsUpdate; +extern int8_t tsUpdate; // balance -extern int32_t tsEnableBalance; -extern int32_t tsAlternativeRole; +extern int8_t tsEnableBalance; +extern int8_t tsAlternativeRole; extern int32_t tsBalanceInterval; extern int32_t tsOfflineThreshold; extern int32_t tsMnodeEqualVnodeNum; -extern int32_t tsEnableFlowCtrl; -extern int32_t tsEnableSlaveQuery; +extern int8_t tsEnableFlowCtrl; +extern int8_t tsEnableSlaveQuery; +extern int8_t tsEnableAdjustMaster; // restful -extern int32_t tsEnableHttpModule; +extern int8_t tsEnableHttpModule; extern int32_t tsRestRowLimit; extern uint16_t tsHttpPort; extern int32_t tsHttpCacheSessions; extern int32_t tsHttpSessionExpire; extern int32_t tsHttpMaxThreads; -extern int32_t tsHttpEnableCompress; -extern int32_t tsHttpEnableRecordSql; -extern int32_t tsTelegrafUseFieldNum; +extern int8_t tsHttpEnableCompress; +extern int8_t tsHttpEnableRecordSql; +extern int8_t tsTelegrafUseFieldNum; // mqtt -extern int32_t tsEnableMqttModule; -extern char tsMqttHostName[]; -extern char tsMqttPort[]; -extern char tsMqttUser[]; -extern char tsMqttPass[]; -extern char tsMqttClientId[]; -extern char tsMqttTopic[]; +extern int8_t tsEnableMqttModule; +extern char tsMqttHostName[]; +extern char tsMqttPort[]; +extern char tsMqttUser[]; +extern char tsMqttPass[]; +extern char tsMqttClientId[]; +extern char tsMqttTopic[]; // monitor -extern int32_t tsEnableMonitorModule; +extern int8_t tsEnableMonitorModule; extern char tsMonitorDbName[]; extern char tsInternalPass[]; extern int32_t tsMonitorInterval; // stream -extern int32_t tsEnableStream; +extern int8_t tsEnableStream; // internal -extern int32_t tsPrintAuth; -extern uint32_t tscEmbedded; +extern int8_t tsPrintAuth; +extern int8_t tscEmbedded; extern char configDir[]; extern char tsVnodeDir[]; extern char tsDnodeDir[]; @@ -172,13 +173,13 @@ extern char gitinfoOfInternal[]; extern char buildinfo[]; // log -extern int32_t tsAsyncLog; +extern int8_t tsAsyncLog; extern int32_t tsNumOfLogLines; extern int32_t tsLogKeepDays; extern int32_t dDebugFlag; extern int32_t vDebugFlag; extern int32_t mDebugFlag; -extern uint32_t cDebugFlag; +extern int32_t cDebugFlag; extern int32_t jniDebugFlag; extern int32_t tmrDebugFlag; extern int32_t sdbDebugFlag; @@ -188,7 +189,7 @@ extern int32_t monDebugFlag; extern int32_t uDebugFlag; extern int32_t rpcDebugFlag; extern int32_t odbcDebugFlag; -extern uint32_t qDebugFlag; +extern int32_t qDebugFlag; extern int32_t wDebugFlag; extern int32_t cqDebugFlag; extern int32_t debugFlag; diff --git a/src/common/inc/tulog.h b/src/common/inc/tulog.h index 74f572619b..566da40a10 100644 --- a/src/common/inc/tulog.h +++ b/src/common/inc/tulog.h @@ -23,7 +23,7 @@ extern "C" { #include "tlog.h" extern int32_t uDebugFlag; -extern uint32_t tscEmbedded; +extern int8_t tscEmbedded; #define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 6a0fc2a63d..5d783f8a83 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -39,8 +39,8 @@ uint16_t tsSyncPort = 6040; uint16_t tsArbitratorPort = 6042; int32_t tsStatusInterval = 1; // second int32_t tsNumOfMnodes = 3; -int32_t tsEnableVnodeBak = 1; -int32_t tsEnableTelemetryReporting = 1; +int8_t tsEnableVnodeBak = 1; +int8_t tsEnableTelemetryReporting = 1; char tsEmail[TSDB_FQDN_LEN] = {0}; // common @@ -56,7 +56,7 @@ int8_t tsDaylight = 0; char tsTimezone[TSDB_TIMEZONE_LEN] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string -int32_t tsEnableCoreFile = 0; +int8_t tsEnableCoreFile = 0; int32_t tsMaxBinaryDisplayWidth = 30; char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/"; @@ -73,7 +73,7 @@ int32_t tsCompressMsgSize = -1; // client int32_t tsTableMetaKeepTimer = 7200; // second int32_t tsMaxSQLStringLen = TSDB_MAX_SQL_LEN; -int32_t tsTscEnableRecordSql = 0; +int8_t tsTscEnableRecordSql = 0; // the maximum number of results for projection query on super table that are returned from // one virtual node, to order according to timestamp @@ -110,7 +110,7 @@ int32_t tsQueryBufferSize = -1; int32_t tsRetrieveBlockingModel = 0; // last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name -int32_t tsKeepOriginalColumnName = 0; +int8_t tsKeepOriginalColumnName = 0; // db parameters int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE; @@ -126,34 +126,35 @@ int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; -int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION; +int8_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION; int32_t tsMaxVgroupsPerDb = 0; int32_t tsMinTablePerVnode = TSDB_TABLES_STEP; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; int32_t tsTableIncStepPerVnode = TSDB_TABLES_STEP; // balance -int32_t tsEnableBalance = 1; -int32_t tsAlternativeRole = 0; -int32_t tsBalanceInterval = 300; // seconds -int32_t tsOfflineThreshold = 86400*100; // seconds 10days +int8_t tsEnableBalance = 1; +int8_t tsAlternativeRole = 0; +int32_t tsBalanceInterval = 300; // seconds +int32_t tsOfflineThreshold = 86400 * 100; // seconds 10days int32_t tsMnodeEqualVnodeNum = 4; -int32_t tsEnableFlowCtrl = 1; -int32_t tsEnableSlaveQuery = 1; +int8_t tsEnableFlowCtrl = 1; +int8_t tsEnableSlaveQuery = 1; +int8_t tsEnableAdjustMaster = 1; // restful -int32_t tsEnableHttpModule = 1; +int8_t tsEnableHttpModule = 1; int32_t tsRestRowLimit = 10240; uint16_t tsHttpPort = 6041; // only tcp, range tcp[6041] int32_t tsHttpCacheSessions = 1000; int32_t tsHttpSessionExpire = 36000; int32_t tsHttpMaxThreads = 2; -int32_t tsHttpEnableCompress = 1; -int32_t tsHttpEnableRecordSql = 0; -int32_t tsTelegrafUseFieldNum = 0; +int8_t tsHttpEnableCompress = 1; +int8_t tsHttpEnableRecordSql = 0; +int8_t tsTelegrafUseFieldNum = 0; // mqtt -int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default +int8_t tsEnableMqttModule = 0; // not finished yet, not started it by default char tsMqttHostName[TSDB_MQTT_HOSTNAME_LEN] = "test.mosquitto.org"; char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883"; char tsMqttUser[TSDB_MQTT_USER_LEN] = {0}; @@ -162,24 +163,24 @@ char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber"; char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/test"; // # // monitor -int32_t tsEnableMonitorModule = 1; +int8_t tsEnableMonitorModule = 1; char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; char tsInternalPass[] = "secretkey"; int32_t tsMonitorInterval = 30; // seconds // stream -int32_t tsEnableStream = 1; +int8_t tsEnableStream = 1; // internal -int32_t tsPrintAuth = 0; -uint32_t tscEmbedded = 0; -char configDir[TSDB_FILENAME_LEN] = {0}; -char tsVnodeDir[TSDB_FILENAME_LEN] = {0}; -char tsDnodeDir[TSDB_FILENAME_LEN] = {0}; -char tsMnodeDir[TSDB_FILENAME_LEN] = {0}; -char tsDataDir[TSDB_FILENAME_LEN] = {0}; -char tsScriptDir[TSDB_FILENAME_LEN] = {0}; -char tsVnodeBakDir[TSDB_FILENAME_LEN] = {0}; +int8_t tsPrintAuth = 0; +int8_t tscEmbedded = 0; +char configDir[TSDB_FILENAME_LEN] = {0}; +char tsVnodeDir[TSDB_FILENAME_LEN] = {0}; +char tsDnodeDir[TSDB_FILENAME_LEN] = {0}; +char tsMnodeDir[TSDB_FILENAME_LEN] = {0}; +char tsDataDir[TSDB_FILENAME_LEN] = {0}; +char tsScriptDir[TSDB_FILENAME_LEN] = {0}; +char tsVnodeBakDir[TSDB_FILENAME_LEN] = {0}; /* * minimum scale for whole system, millisecond by default @@ -210,19 +211,19 @@ int32_t mDebugFlag = 131; int32_t sdbDebugFlag = 131; int32_t dDebugFlag = 135; int32_t vDebugFlag = 135; -uint32_t cDebugFlag = 131; +int32_t cDebugFlag = 131; int32_t jniDebugFlag = 131; int32_t odbcDebugFlag = 131; int32_t httpDebugFlag = 131; int32_t mqttDebugFlag = 131; int32_t monDebugFlag = 131; -uint32_t qDebugFlag = 131; +int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 131; int32_t uDebugFlag = 131; int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; -uint32_t tsdbDebugFlag = 131; +int32_t tsdbDebugFlag = 131; int32_t cqDebugFlag = 131; int32_t (*monStartSystemFp)() = NULL; @@ -469,7 +470,7 @@ static void doInitGlobalConfig(void) { cfg.option = "vnodeBak"; cfg.ptr = &tsEnableVnodeBak; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -479,7 +480,7 @@ static void doInitGlobalConfig(void) { cfg.option = "telemetryReporting"; cfg.ptr = &tsEnableTelemetryReporting; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -489,7 +490,7 @@ static void doInitGlobalConfig(void) { cfg.option = "balance"; cfg.ptr = &tsEnableBalance; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -510,7 +511,7 @@ static void doInitGlobalConfig(void) { // 0-any; 1-mnode; 2-vnode cfg.option = "role"; cfg.ptr = &tsAlternativeRole; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; cfg.minValue = 0; cfg.maxValue = 2; @@ -812,7 +813,7 @@ static void doInitGlobalConfig(void) { cfg.option = "update"; cfg.ptr = &tsUpdate; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = TSDB_MIN_DB_UPDATE; cfg.maxValue = TSDB_MAX_DB_UPDATE; @@ -902,7 +903,7 @@ static void doInitGlobalConfig(void) { cfg.option = "keepColumnName"; cfg.ptr = &tsKeepOriginalColumnName; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; cfg.minValue = 0; cfg.maxValue = 1; @@ -1006,7 +1007,7 @@ static void doInitGlobalConfig(void) { // module configs cfg.option = "flowctrl"; cfg.ptr = &tsEnableFlowCtrl; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1016,7 +1017,17 @@ static void doInitGlobalConfig(void) { cfg.option = "slaveQuery"; cfg.ptr = &tsEnableSlaveQuery; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "adjustMaster"; + cfg.ptr = &tsEnableAdjustMaster; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1026,7 +1037,7 @@ static void doInitGlobalConfig(void) { cfg.option = "http"; cfg.ptr = &tsEnableHttpModule; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1036,7 +1047,7 @@ static void doInitGlobalConfig(void) { cfg.option = "mqtt"; cfg.ptr = &tsEnableMqttModule; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1046,7 +1057,7 @@ static void doInitGlobalConfig(void) { cfg.option = "monitor"; cfg.ptr = &tsEnableMonitorModule; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1056,7 +1067,7 @@ static void doInitGlobalConfig(void) { cfg.option = "stream"; cfg.ptr = &tsEnableStream; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1066,7 +1077,7 @@ static void doInitGlobalConfig(void) { cfg.option = "httpEnableRecordSql"; cfg.ptr = &tsHttpEnableRecordSql; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; cfg.minValue = 0; cfg.maxValue = 1; @@ -1076,7 +1087,7 @@ static void doInitGlobalConfig(void) { cfg.option = "telegrafUseFieldNum"; cfg.ptr = &tsTelegrafUseFieldNum; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 0; cfg.maxValue = 1; @@ -1127,7 +1138,7 @@ static void doInitGlobalConfig(void) { cfg.option = "asyncLog"; cfg.ptr = &tsAsyncLog; - cfg.valType = TAOS_CFG_VTYPE_INT16; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; cfg.minValue = 0; cfg.maxValue = 1; @@ -1328,7 +1339,7 @@ static void doInitGlobalConfig(void) { cfg.option = "enableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; cfg.minValue = 0; cfg.maxValue = 1; @@ -1338,7 +1349,7 @@ static void doInitGlobalConfig(void) { cfg.option = "enableCoreFile"; cfg.ptr = &tsEnableCoreFile; - cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; cfg.minValue = 0; cfg.maxValue = 1; diff --git a/src/dnode/src/dnodeVnodes.c b/src/dnode/src/dnodeVnodes.c index f6307b67d6..03b51feb9c 100644 --- a/src/dnode/src/dnodeVnodes.c +++ b/src/dnode/src/dnodeVnodes.c @@ -245,12 +245,11 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { pStatus->lastReboot = htonl(tsRebootTime); pStatus->numOfCores = htons((uint16_t) tsNumOfCores); pStatus->diskAvailable = tsAvailDataDirGB; - pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + pStatus->alternativeRole = tsAlternativeRole; tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); // fill cluster cfg parameters pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes); - pStatus->clusterCfg.enableBalance = htonl(tsEnableBalance); pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum); pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold); pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); @@ -262,7 +261,12 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); + + pStatus->clusterCfg.enableBalance = tsEnableBalance; + pStatus->clusterCfg.flowCtrl = tsEnableFlowCtrl; + pStatus->clusterCfg.slaveQuery = tsEnableSlaveQuery; + pStatus->clusterCfg.adjustMaster = tsEnableAdjustMaster; vnodeBuildStatusMsg(pStatus); contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2df243eb3e..a4322fa6cd 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -604,7 +604,6 @@ typedef struct { typedef struct { int32_t numOfMnodes; // tsNumOfMnodes - int32_t enableBalance; // tsEnableBalance int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum int32_t offlineThreshold; // tsOfflineThreshold int32_t statusInterval; // tsStatusInterval @@ -615,6 +614,11 @@ typedef struct { int64_t checkTime; // 1970-01-01 00:00:00.000 char locale[TSDB_LOCALE_LEN]; // tsLocale char charset[TSDB_LOCALE_LEN]; // tsCharset + int8_t enableBalance; // tsEnableBalance + int8_t flowCtrl; + int8_t slaveQuery; + int8_t adjustMaster; + int8_t reserved[4]; } SClusterCfg; typedef struct { diff --git a/src/mnode/inc/mnodeDnode.h b/src/mnode/inc/mnodeDnode.h index b959da73e8..56d7455ad2 100644 --- a/src/mnode/inc/mnodeDnode.h +++ b/src/mnode/inc/mnodeDnode.h @@ -52,6 +52,9 @@ typedef enum EDnodeOfflineReason { TAOS_DN_OFF_TIME_ZONE_NOT_MATCH, TAOS_DN_OFF_LOCALE_NOT_MATCH, TAOS_DN_OFF_CHARSET_NOT_MATCH, + TAOS_DN_OFF_FLOW_CTRL_NOT_MATCH, + TAOS_DN_OFF_SLAVE_QUERY_NOT_MATCH, + TAOS_DN_OFF_ADJUST_MASTER_NOT_MATCH, TAOS_DN_OFF_OTHERS } EDnodeOfflineReason; diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 27588669eb..ba8afecce1 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -375,10 +375,6 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { mError("\"numOfMnodes\"[%d - %d] cfg parameters inconsistent", clusterCfg->numOfMnodes, htonl(tsNumOfMnodes)); return TAOS_DN_OFF_NUM_OF_MNODES_NOT_MATCH; } - if (clusterCfg->enableBalance != htonl(tsEnableBalance)) { - mError("\"balance\"[%d - %d] cfg parameters inconsistent", clusterCfg->enableBalance, htonl(tsEnableBalance)); - return TAOS_DN_OFF_ENABLE_BALANCE_NOT_MATCH; - } if (clusterCfg->mnodeEqualVnodeNum != htonl(tsMnodeEqualVnodeNum)) { mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg parameters inconsistent", clusterCfg->mnodeEqualVnodeNum, htonl(tsMnodeEqualVnodeNum)); @@ -428,6 +424,23 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { return TAOS_DN_OFF_CHARSET_NOT_MATCH; } + if (clusterCfg->enableBalance != tsEnableBalance) { + mError("\"balance\"[%d - %d] cfg parameters inconsistent", clusterCfg->enableBalance, tsEnableBalance); + return TAOS_DN_OFF_ENABLE_BALANCE_NOT_MATCH; + } + if (clusterCfg->flowCtrl != tsEnableFlowCtrl) { + mError("\"flowCtrl\"[%d - %d] cfg parameters inconsistent", clusterCfg->flowCtrl, tsEnableFlowCtrl); + return TAOS_DN_OFF_FLOW_CTRL_NOT_MATCH; + } + if (clusterCfg->slaveQuery != tsEnableSlaveQuery) { + mError("\"slaveQuery\"[%d - %d] cfg parameters inconsistent", clusterCfg->slaveQuery, tsEnableSlaveQuery); + return TAOS_DN_OFF_SLAVE_QUERY_NOT_MATCH; + } + if (clusterCfg->adjustMaster != tsEnableAdjustMaster) { + mError("\"adjustMaster\"[%d - %d] cfg parameters inconsistent", clusterCfg->adjustMaster, tsEnableAdjustMaster); + return TAOS_DN_OFF_ADJUST_MASTER_NOT_MATCH; + } + return 0; } diff --git a/src/query/inc/queryLog.h b/src/query/inc/queryLog.h index 825ff12538..d4e909d33a 100644 --- a/src/query/inc/queryLog.h +++ b/src/query/inc/queryLog.h @@ -22,8 +22,8 @@ extern "C" { #include "tlog.h" -extern uint32_t qDebugFlag; -extern uint32_t tscEmbedded; +extern int32_t qDebugFlag; +extern int8_t tscEmbedded; #define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", 255, __VA_ARGS__); }} while(0) #define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", 255, __VA_ARGS__); }} while(0) diff --git a/src/rpc/inc/rpcLog.h b/src/rpc/inc/rpcLog.h index 3c1614cda2..6c4a281d2c 100644 --- a/src/rpc/inc/rpcLog.h +++ b/src/rpc/inc/rpcLog.h @@ -23,7 +23,7 @@ extern "C" { #include "tlog.h" extern int32_t rpcDebugFlag; -extern uint32_t tscEmbedded; +extern int8_t tscEmbedded; #define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} #define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 15da19d95d..b7ff88b50c 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -31,7 +31,7 @@ extern "C" { #endif -extern uint32_t tsdbDebugFlag; +extern int32_t tsdbDebugFlag; #define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0) #define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0) diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index 665528f140..bc1da9858a 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -41,6 +41,7 @@ enum { }; enum { + TAOS_CFG_VTYPE_INT8, TAOS_CFG_VTYPE_INT16, TAOS_CFG_VTYPE_INT32, TAOS_CFG_VTYPE_FLOAT, diff --git a/src/util/src/tconfig.c b/src/util/src/tconfig.c index 1d2bd6252f..0944ce8fac 100644 --- a/src/util/src/tconfig.c +++ b/src/util/src/tconfig.c @@ -95,6 +95,23 @@ static void taosReadInt16Config(SGlobalCfg *cfg, char *input_value) { } } +static void taosReadInt8Config(SGlobalCfg *cfg, char *input_value) { + int32_t value = atoi(input_value); + int8_t *option = (int8_t *)cfg->ptr; + if (value < cfg->minValue || value > cfg->maxValue) { + uError("config option:%s, input value:%s, out of range[%f, %f], use default value:%d", + cfg->option, input_value, cfg->minValue, cfg->maxValue, *option); + } else { + if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_FILE) { + *option = (int8_t)value; + cfg->cfgStatus = TAOS_CFG_CSTATUS_FILE; + } else { + uWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, input_value, + tsCfgStatusStr[cfg->cfgStatus], *option); + } + } +} + static void taosReadDirectoryConfig(SGlobalCfg *cfg, char *input_value) { int length = (int)strlen(input_value); char *option = (char *)cfg->ptr; @@ -204,6 +221,9 @@ static void taosReadConfigOption(const char *option, char *value) { if (strcasecmp(cfg->option, option) != 0) continue; switch (cfg->valType) { + case TAOS_CFG_VTYPE_INT8: + taosReadInt8Config(cfg, value); + break; case TAOS_CFG_VTYPE_INT16: taosReadInt16Config(cfg, value); break; diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 76ea6fa1e5..fa6a9db8ec 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -64,7 +64,7 @@ typedef struct { } SLogObj; int32_t tsLogKeepDays = 0; -int32_t tsAsyncLog = 1; +int8_t tsAsyncLog = 1; float tsTotalLogDirGB = 0; float tsAvailLogDirGB = 0; float tsMinimalLogDirGB = 1.0f; diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 114f0764e8..3c0462e980 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -19,7 +19,7 @@ #include "ttimer.h" #include "tutil.h" -extern uint32_t tscEmbedded; +extern int8_t tscEmbedded; #define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} #define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }} From ccd7249e2f4fede2b0c46ee52abd23170f43d6aa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Dec 2020 16:22:31 +0800 Subject: [PATCH 07/12] [TD-2457]: check the invalid order by clause for top/bottom query. --- src/client/src/tscSQLParser.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4dad65fe9e..e82863ff14 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -666,6 +666,7 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ const char* msg1 = "invalid query expression"; const char* msg2 = "interval cannot be less than 10 ms"; const char* msg3 = "sliding cannot be used without interval"; + const char* msg4 = "top/bottom query does not support order by value in interval query"; SSqlCmd* pCmd = &pSql->cmd; @@ -712,6 +713,11 @@ int32_t parseIntervalClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQ return TSDB_CODE_TSC_INVALID_SQL; } + int32_t colId = pQueryInfo->order.orderColId; + if (pQueryInfo->interval.interval > 0 && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); + } + return TSDB_CODE_SUCCESS; } @@ -4646,7 +4652,7 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu if (!(orderByTags || orderByTS) && !isTopBottomQuery(pQueryInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); - } else { + } else { // order by top/bottom result value column is not supported in case of interval query. assert(!(orderByTags && orderByTS)); } From ab2f4ad389294ea87d52543230d7e6ac67fe40ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Dec 2020 16:24:45 +0800 Subject: [PATCH 08/12] [TD-2562]: fix bug in twa query. --- src/query/src/qExecutor.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 64eba616ad..fb6f5f2044 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -753,11 +753,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) { - if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { + if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); pResultRowInfo->curIndex = pResultRowInfo->size - 1; } else { - doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey, ascQuery); + int32_t step = ascQuery? 1:-1; + doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey - step, ascQuery); } } @@ -1198,8 +1199,12 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // prev time window not interpolation yet. int32_t curIndex = curTimeWindowIndex(pWindowResInfo); if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) { - for(int32_t j = prevIndex; j < curIndex; ++j) { + for(int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. SResultRow *pRes = pWindowResInfo->pResult[j]; + if (pRes->closed) { + assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); + continue; + } STimeWindow w = pRes->win; ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId); @@ -1600,6 +1605,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (prevWindowIndex != -1 && prevWindowIndex < curIndex) { for (int32_t k = prevWindowIndex; k < curIndex; ++k) { SResultRow *pRes = pWindowResInfo->pResult[k]; + if (pRes->closed) { + assert(resultRowInterpolated(pResult, RESULT_ROW_START_INTERP) && resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + continue; + } ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); @@ -1713,10 +1722,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); } - // update the lastkey of current table - TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - // interval query with limit applied int32_t numOfRes = 0; if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { From b4d667d574f5e8a4cd51eb4b560d94cd875fb350 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 26 Dec 2020 17:04:50 +0800 Subject: [PATCH 09/12] [TD-2561]: new field clusterId for SConnectRsp --- src/dnode/inc/dnodeCfg.h | 1 + src/dnode/src/dnodeCfg.c | 6 ++++++ src/inc/dnode.h | 4 +++- src/inc/taosmsg.h | 1 + src/mnode/src/mnodeShow.c | 2 ++ 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/dnode/inc/dnodeCfg.h b/src/dnode/inc/dnodeCfg.h index d74303f325..896b3f574c 100644 --- a/src/dnode/inc/dnodeCfg.h +++ b/src/dnode/inc/dnodeCfg.h @@ -25,6 +25,7 @@ int32_t dnodeInitCfg(); void dnodeCleanupCfg(); void dnodeUpdateCfg(SDnodeCfg *cfg); int32_t dnodeGetDnodeId(); +void dnodeGetClusterId(char *clusterId); void dnodeGetCfg(int32_t *dnodeId, char *clusterId); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeCfg.c b/src/dnode/src/dnodeCfg.c index 89249d773b..f495dbe285 100644 --- a/src/dnode/src/dnodeCfg.c +++ b/src/dnode/src/dnodeCfg.c @@ -51,6 +51,12 @@ int32_t dnodeGetDnodeId() { return dnodeId; } +void dnodeGetClusterId(char *clusterId) { + pthread_mutex_lock(&tsCfgMutex); + tstrncpy(clusterId, tsCfg.clusterId, TSDB_CLUSTER_ID_LEN); + pthread_mutex_unlock(&tsCfgMutex); +} + void dnodeGetCfg(int32_t *dnodeId, char *clusterId) { pthread_mutex_lock(&tsCfgMutex); *dnodeId = tsCfg.dnodeId; diff --git a/src/inc/dnode.h b/src/inc/dnode.h index dd41360e68..877738778b 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -36,6 +36,8 @@ bool dnodeIsMasterEp(char *ep); void dnodeGetEpSetForPeer(SRpcEpSet *epSet); void dnodeGetEpSetForShell(SRpcEpSet *epSet); int32_t dnodeGetDnodeId(); +void dnodeGetClusterId(char *clusterId); + void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr); bool dnodeStartMnode(SMInfos *pMinfos); @@ -80,4 +82,4 @@ void dnodeReportStep(char *name, char *desc, int8_t finished); } #endif -#endif \ No newline at end of file +#endif diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2df243eb3e..f27af2a37b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -324,6 +324,7 @@ typedef struct { typedef struct { char acctId[TSDB_ACCT_LEN]; char serverVersion[TSDB_VERSION_LEN]; + char clusterId[TSDB_CLUSTER_ID_LEN]; int8_t writeAuth; int8_t superAuth; int8_t reserved1; diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 3c1c92226a..6b9f0e26a7 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -351,6 +351,8 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false); + dnodeGetClusterId(pConnectRsp->clusterId); + connect_over: if (code != TSDB_CODE_SUCCESS) { if (pConnectRsp) rpcFreeCont(pConnectRsp); From 04e5bb2c389406137c8aadb1cf0f95ceca88f24e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 26 Dec 2020 17:08:26 +0800 Subject: [PATCH 10/12] [TD-2563]: fix invalid write caused by top/bottom query at client side. --- src/client/src/tscLocalMerge.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 14e8b0084e..6f9bc6debc 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -93,7 +93,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc // for top/bottom function, the output of timestamp is the first column int32_t functionId = pExpr->functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf; pCtx->param[2].i64Key = pQueryInfo->order.order; pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; @@ -1296,6 +1296,10 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// re for (int32_t i = 0; i < t; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity; + + if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM || pExpr->functionId == TSDB_FUNC_DIFF) { + pLocalReducer->pCtx[i].ptsOutputBuf = pLocalReducer->pCtx[0].aOutputBuf; + } } memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage)); From 927a2eea7f63e3b2fac4ffdbbead9f223920f90c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 26 Dec 2020 18:07:24 +0800 Subject: [PATCH 11/12] adjust log --- src/common/src/tglobal.c | 8 ++++++-- src/mnode/src/mnodeDnode.c | 5 +++++ src/util/src/tconfig.c | 6 ++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 5d783f8a83..1bb638296e 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -277,12 +277,16 @@ bool taosCfgDynamicOptions(char *msg) { for (int32_t i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; //if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; - if (cfg->valType != TAOS_CFG_VTYPE_INT32) continue; + if (cfg->valType != TAOS_CFG_VTYPE_INT32 && cfg->valType != TAOS_CFG_VTYPE_INT8) continue; int32_t cfgLen = (int32_t)strlen(cfg->option); if (cfgLen != olen) continue; if (strncasecmp(option, cfg->option, olen) != 0) continue; - *((int32_t *)cfg->ptr) = vint; + if (cfg->valType != TAOS_CFG_VTYPE_INT32) { + *((int32_t *)cfg->ptr) = vint; + } else { + *((int8_t *)cfg->ptr) = (int8_t)vint; + } if (strncasecmp(cfg->option, "monitor", olen) == 0) { if (1 == vint) { diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index ba8afecce1..1ff2404834 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -1044,6 +1044,11 @@ static int32_t mnodeRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, v pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; switch (cfg->valType) { + case TAOS_CFG_VTYPE_INT8: + t = snprintf(varDataVal(pWrite), TSDB_CFG_VALUE_LEN, "%d", *((int8_t *)cfg->ptr)); + varDataSetLen(pWrite, t); + numOfRows++; + break; case TAOS_CFG_VTYPE_INT16: t = snprintf(varDataVal(pWrite), TSDB_CFG_VALUE_LEN, "%d", *((int16_t *)cfg->ptr)); varDataSetLen(pWrite, t); diff --git a/src/util/src/tconfig.c b/src/util/src/tconfig.c index 0944ce8fac..0a9f5a98c0 100644 --- a/src/util/src/tconfig.c +++ b/src/util/src/tconfig.c @@ -396,6 +396,9 @@ void taosPrintGlobalCfg() { blank[blankLen] = 0; switch (cfg->valType) { + case TAOS_CFG_VTYPE_INT8: + uInfo(" %s:%s%d%s", cfg->option, blank, *((int8_t *)cfg->ptr), tsGlobalUnit[cfg->unitType]); + break; case TAOS_CFG_VTYPE_INT16: uInfo(" %s:%s%d%s", cfg->option, blank, *((int16_t *)cfg->ptr), tsGlobalUnit[cfg->unitType]); break; @@ -428,6 +431,9 @@ static void taosDumpCfg(SGlobalCfg *cfg) { blank[blankLen] = 0; switch (cfg->valType) { + case TAOS_CFG_VTYPE_INT8: + printf(" %s:%s%d%s\n", cfg->option, blank, *((int8_t *)cfg->ptr), tsGlobalUnit[cfg->unitType]); + break; case TAOS_CFG_VTYPE_INT16: printf(" %s:%s%d%s\n", cfg->option, blank, *((int16_t *)cfg->ptr), tsGlobalUnit[cfg->unitType]); break; From 108814cf14eb2fae4e049234e6a94b8d4ebe2fd9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 26 Dec 2020 21:37:22 +0800 Subject: [PATCH 12/12] header files --- src/dnode/src/dnodeMRead.c | 2 -- src/dnode/src/dnodeMWrite.c | 1 - 2 files changed, 3 deletions(-) diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 0fc6400d99..9027c346f5 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -16,9 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tqueue.h" -#include "twal.h" #include "mnode.h" -#include "dnodeVMgmt.h" #include "dnodeMInfos.h" #include "dnodeMRead.h" diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 414b66653d..ea1cf028c5 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -18,7 +18,6 @@ #include "ttimer.h" #include "tqueue.h" #include "mnode.h" -#include "dnodeVMgmt.h" #include "dnodeMInfos.h" #include "dnodeMWrite.h"