From c3abd9d554c9d9dc72feef05d5548e7c29c9be7a Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 19 Jun 2024 09:38:55 +0800 Subject: [PATCH 01/16] enh: support display unlimited for expiration time --- include/common/tgrant.h | 1 + source/common/src/systable.c | 2 +- source/dnode/mnode/impl/inc/mndInt.h | 2 +- source/dnode/mnode/impl/src/mndCluster.c | 20 +++++++++++++++++--- source/dnode/mnode/impl/src/mndMain.c | 6 ++++-- 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index b707045bd1..fc1e45fb2a 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -29,6 +29,7 @@ extern "C" { #endif #define GRANT_HEART_BEAT_MIN 2 +#define GRANT_UNIQ_UNLIMITED (-1) #define GRANT_ACTIVE_CODE "activeCode" #define GRANT_FLAG_ALL (0x01) #define GRANT_FLAG_AUDIT (0x02) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 6558df1fc1..35f2908968 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -85,7 +85,7 @@ static const SSysDbTableSchema clusterSchema[] = { {.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, + {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, }; static const SSysDbTableSchema userDBSchema[] = { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 2da14c65d2..0c446182a6 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -100,7 +100,7 @@ typedef struct { } SSyncMgmt; typedef struct { - int64_t expireTimeMS; + int64_t expireTimeSec; int64_t timeseriesAllowed; } SGrantInfo; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index f2b279276e..ca7544877c 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -310,11 +310,25 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)ver, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char expireTime[25] = {0}; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols); if (tsExpireTime <= 0) { - colDataSetNULL(pColInfo, numOfRows); + if (tsExpireTime == GRANT_UNIQ_UNLIMITED) { + STR_WITH_MAXSIZE_TO_VARSTR(expireTime, "unlimited", pShow->pMeta->pSchemas[cols].bytes); + colDataSetVal(pColInfo, numOfRows, expireTime, false); + } else { + colDataSetNULL(pColInfo, numOfRows); + } } else { - colDataSetVal(pColInfo, numOfRows, (const char *)&tsExpireTime, false); + char ts[20] = {0}; + struct tm ptm; + if (taosLocalTime(&tsExpireTime, &ptm, ts) != NULL) { + strftime(ts, 20, "%Y-%m-%d %H:%M:%S", &ptm); + } else { + ts[0] = 0; + } + STR_WITH_MAXSIZE_TO_VARSTR(expireTime, ts, pShow->pMeta->pSchemas[cols].bytes); + colDataSetVal(pColInfo, numOfRows, expireTime, false); } sdbRelease(pSdb, pCluster); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..7c8ef2f0b7 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -1028,9 +1028,11 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } // grant info - pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000; + pGrantInfo->expire_time = pMnode->grant.expireTimeSec == GRANT_UNIQ_UNLIMITED + ? GRANT_UNIQ_UNLIMITED + : (pMnode->grant.expireTimeSec - ms / 1000); pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; - if (pMnode->grant.expireTimeMS == 0) { + if (pMnode->grant.expireTimeSec == 0) { pGrantInfo->expire_time = 0; pGrantInfo->timeseries_total = 0; } From 7c429e5799fe1a0dad51106e9ae40b7d6fc881d0 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 19 Jun 2024 14:25:17 +0800 Subject: [PATCH 02/16] enh: support display unlimited for expiration time --- include/common/tgrant.h | 13 +++++++------ source/dnode/mnode/impl/inc/mndInt.h | 2 +- source/dnode/mnode/impl/src/mndCluster.c | 15 +++++++-------- source/dnode/mnode/impl/src/mndMain.c | 6 ++---- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index fc1e45fb2a..ae58ba88ad 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -28,12 +28,13 @@ extern "C" { #define GRANTS_COL_MAX_LEN 196 #endif -#define GRANT_HEART_BEAT_MIN 2 -#define GRANT_UNIQ_UNLIMITED (-1) -#define GRANT_ACTIVE_CODE "activeCode" -#define GRANT_FLAG_ALL (0x01) -#define GRANT_FLAG_AUDIT (0x02) -#define GRANT_FLAG_VIEW (0x04) +#define GRANT_HEART_BEAT_MIN 2 +#define GRANT_MAX_EXPIRE_SEC (31556995201) +#define GRANT_EXPIRE_UNLIMITED(v) ((v) == GRANT_MAX_EXPIRE_SEC) +#define GRANT_ACTIVE_CODE "activeCode" +#define GRANT_FLAG_ALL (0x01) +#define GRANT_FLAG_AUDIT (0x02) +#define GRANT_FLAG_VIEW (0x04) typedef enum { TSDB_GRANT_ALL, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 0c446182a6..2da14c65d2 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -100,7 +100,7 @@ typedef struct { } SSyncMgmt; typedef struct { - int64_t expireTimeSec; + int64_t expireTimeMS; int64_t timeseriesAllowed; } SGrantInfo; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index ca7544877c..a811a1ada0 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -312,17 +312,16 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock * char expireTime[25] = {0}; pColInfo = taosArrayGet(pBlock->pDataBlock, cols); - if (tsExpireTime <= 0) { - if (tsExpireTime == GRANT_UNIQ_UNLIMITED) { - STR_WITH_MAXSIZE_TO_VARSTR(expireTime, "unlimited", pShow->pMeta->pSchemas[cols].bytes); - colDataSetVal(pColInfo, numOfRows, expireTime, false); - } else { - colDataSetNULL(pColInfo, numOfRows); - } + if (GRANT_EXPIRE_UNLIMITED(tsExpireTime / 1000)) { + STR_WITH_MAXSIZE_TO_VARSTR(expireTime, "unlimited", pShow->pMeta->pSchemas[cols].bytes); + colDataSetVal(pColInfo, numOfRows, expireTime, false); + } else if (tsExpireTime <= 0) { + colDataSetNULL(pColInfo, numOfRows); } else { char ts[20] = {0}; + time_t expireSec = tsExpireTime / 1000; struct tm ptm; - if (taosLocalTime(&tsExpireTime, &ptm, ts) != NULL) { + if (taosLocalTime(&expireSec, &ptm, ts) != NULL) { strftime(ts, 20, "%Y-%m-%d %H:%M:%S", &ptm); } else { ts[0] = 0; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7c8ef2f0b7..cad8c6d745 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -1028,11 +1028,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } // grant info - pGrantInfo->expire_time = pMnode->grant.expireTimeSec == GRANT_UNIQ_UNLIMITED - ? GRANT_UNIQ_UNLIMITED - : (pMnode->grant.expireTimeSec - ms / 1000); + pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000; pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; - if (pMnode->grant.expireTimeSec == 0) { + if (pMnode->grant.expireTimeMS == 0) { pGrantInfo->expire_time = 0; pGrantInfo->timeseries_total = 0; } From 780ce7aa95233e125bfc92b59a463946505ca27f Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 19 Jun 2024 14:33:17 +0800 Subject: [PATCH 03/16] enh: support display unlimited for expiration time --- include/common/tgrant.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index ae58ba88ad..42adabd6f7 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -29,8 +29,8 @@ extern "C" { #endif #define GRANT_HEART_BEAT_MIN 2 -#define GRANT_MAX_EXPIRE_SEC (31556995201) -#define GRANT_EXPIRE_UNLIMITED(v) ((v) == GRANT_MAX_EXPIRE_SEC) +#define GRANT_EXPIRE_VALUE (31556995201) +#define GRANT_EXPIRE_UNLIMITED(v) ((v) == GRANT_EXPIRE_VALUE) #define GRANT_ACTIVE_CODE "activeCode" #define GRANT_FLAG_ALL (0x01) #define GRANT_FLAG_AUDIT (0x02) From 61edc7904563d60612bb11d03a7cc00230d95078 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 19 Jun 2024 08:01:58 +0000 Subject: [PATCH 04/16] fix/TD-30642 --- source/libs/parser/src/parAstCreater.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 64d1260719..baccd6e399 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -2902,7 +2902,8 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName) { SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, STokenPair* pPrivLevel, SToken* pUserName, SNode* pTagCond) { CHECK_PARSER_STATUS(pCxt); - if (!checkDbName(pCxt, &pPrivLevel->first, false) || !checkUserName(pCxt, pUserName)) { + if (!checkDbName(pCxt, &pPrivLevel->first, false) || !checkUserName(pCxt, pUserName) || + !checkTableName(pCxt, &pPrivLevel->second)) { return NULL; } SGrantStmt* pStmt = (SGrantStmt*)nodesMakeNode(QUERY_NODE_GRANT_STMT); From 49d9552ab603b8171d0b61d53a2c0075f42d65ab Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 20 Jun 2024 11:18:23 +0800 Subject: [PATCH 05/16] fix: task reset redirect issue --- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schTask.c | 37 ++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 380862f745..278768981a 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -567,7 +567,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { } SSchLevel *pLevel = pTask->level; - int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1); + int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum); if (doneNum == pLevel->taskNum) { atomic_sub_fetch_32(&pJob->levelIdx, 1); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 97c3c7d276..7f60353b1c 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -248,6 +248,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_LOG_TASK_END_TS(pTask); + atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC); SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask)); @@ -483,6 +485,34 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) { + SSchLevel *pLevel = pTask->level; + + SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d", + atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum)); + + if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) { + atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); + } + + atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); + + int32_t childrenNum = taosArrayGetSize(pTask->children); + for (int32_t i = 0; i < childrenNum; ++i) { + SSchTask *pChild = taosArrayGetP(pTask->children, i); + SCH_LOCK_TASK(pChild); + pLevel = pChild->level; + atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); + atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); + SCH_UNLOCK_TASK(pChild); + } + + SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d", + atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum)); + + return TSDB_CODE_SUCCESS; +} + int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; @@ -498,12 +528,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode)); - for (int32_t i = 0; i < pJob->levelNum; ++i) { - SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - - pLevel->taskExecDoneNum = 0; - pLevel->taskLaunchedNum = 0; - } + SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask)); SCH_RESET_JOB_LEVEL_IDX(pJob); From 7000d597fc4fb4e802bb22a5408099772e0c3a19 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 20 Jun 2024 03:23:27 +0000 Subject: [PATCH 06/16] Fix the issue of dynamically adjusting the IP whitelist. --- source/client/src/clientHb.c | 56 ++++--- source/dnode/mnode/impl/inc/mndUser.h | 1 + source/dnode/mnode/impl/src/mndPrivilege.c | 22 +-- source/dnode/mnode/impl/src/mndProfile.c | 2 +- source/dnode/mnode/impl/src/mndUser.c | 181 ++++++++++++--------- 5 files changed, 135 insertions(+), 127 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1b6cb8fd22..a060dab24c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -133,14 +133,15 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat if (pTscObj->whiteListInfo.fp) { SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo; - int64_t oldVer = atomic_load_64(&whiteListInfo->ver); - if (oldVer < pRsp->whiteListVer) { + int64_t oldVer = atomic_load_64(&whiteListInfo->ver); + + if (oldVer < pRsp->whiteListVer || pRsp->whiteListVer == 0) { atomic_store_64(&whiteListInfo->ver, pRsp->whiteListVer); if (whiteListInfo->fp) { (*whiteListInfo->fp)(whiteListInfo->param, &pRsp->whiteListVer, TAOS_NOTIFY_WHITELIST_VER); } - tscDebug("update whitelist version of user %s from %"PRId64" to %"PRId64", tscRid:%" PRIi64, pRsp->user, oldVer, - atomic_load_64(&whiteListInfo->ver), pTscObj->id); + tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user, + oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id); } } releaseTscObj(pReq->connKey.tscRid); @@ -202,8 +203,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog for (int32_t i = 0; i < numOfBatchs; ++i) { SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i); if (rsp->useDbRsp) { - tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, - rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid); + tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db, + rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid); if (rsp->useDbRsp->vgVersion < 0) { tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db); @@ -225,7 +226,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog goto _return; } - catalogUpdateDBVgInfo(pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->useDbRsp->uid, vgInfo); + catalogUpdateDBVgInfo(pCatalog, + (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, + rsp->useDbRsp->uid, vgInfo); } } } @@ -238,7 +241,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog if (rsp->pTsmaRsp) { if (rsp->pTsmaRsp->pTsmas) { for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) { - STableTSMAInfo* pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i); + STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i); catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion); } taosArrayClear(rsp->pTsmaRsp->pTsmas); @@ -294,16 +297,15 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } - static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { - return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion*)value); + return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value); } -static void hbFreeSViewMetaInRsp(void* p) { - if (NULL == p || NULL == *(void**)p) { +static void hbFreeSViewMetaInRsp(void *p) { + if (NULL == p || NULL == *(void **)p) { return; } - SViewMetaRsp *pRsp = *(SViewMetaRsp**)p; + SViewMetaRsp *pRsp = *(SViewMetaRsp **)p; tFreeSViewMetaRsp(pRsp); taosMemoryFreeClear(pRsp); } @@ -337,7 +339,7 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal return TSDB_CODE_SUCCESS; } -static int32_t hbprocessTSMARsp(void* value, int32_t valueLen, struct SCatalog* pCatalog) { +static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; STSMAHbRsp hbRsp = {0}; @@ -348,7 +350,7 @@ static int32_t hbprocessTSMARsp(void* value, int32_t valueLen, struct SCatalog* int32_t numOfTsma = taosArrayGetSize(hbRsp.pTsmas); for (int32_t i = 0; i < numOfTsma; ++i) { - STableTSMAInfo* pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i); + STableTSMAInfo *pTsmaInfo = taosArrayGetP(hbRsp.pTsmas, i); if (!pTsmaInfo->pFuncs) { tscDebug("hb to remove tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name); @@ -365,7 +367,7 @@ static int32_t hbprocessTSMARsp(void* value, int32_t valueLen, struct SCatalog* return TSDB_CODE_SUCCESS; } -static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { +static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pKvs, i); switch (kv->key) { @@ -489,14 +491,14 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { if (kvNum > 0) { struct SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); + int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code)); } else { hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr); } } - + taosHashRelease(pAppHbMgr->activeInfo, pReq); return TSDB_CODE_SUCCESS; @@ -799,8 +801,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl for (int32_t i = 0; i < dbNum; ++i) { SDbCacheInfo *db = &dbs[i]; - tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64, - i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs); + tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 + ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64, + i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs); db->dbId = htobe64(db->dbId); db->vgVersion = htonl(db->vgVersion); @@ -916,7 +919,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S return TSDB_CODE_SUCCESS; } -int32_t hbGetExpiredTSMAInfo(SClientHbKey* connKey, struct SCatalog* pCatalog, SClientHbReq* pReq) { +int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) { int32_t code = 0; uint32_t tsmaNum = 0; STSMAVersion *tsmas = NULL; @@ -933,7 +936,7 @@ int32_t hbGetExpiredTSMAInfo(SClientHbKey* connKey, struct SCatalog* pCatalog, S } for (int32_t i = 0; i < tsmaNum; ++i) { - STSMAVersion* tsma = &tsmas[i]; + STSMAVersion *tsma = &tsmas[i]; tsma->dbId = htobe64(tsma->dbId); tsma->tsmaId = htobe64(tsma->tsmaId); tsma->version = htonl(tsma->version); @@ -1012,7 +1015,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req if (TSDB_CODE_SUCCESS != code) { return code; } -#endif +#endif code = hbGetExpiredTSMAInfo(connKey, pCatalog, req); } else { req->app.appId = 0; @@ -1151,7 +1154,8 @@ static void *hbThreadFunc(void *param) { if (sz > 0) { hbGatherAppInfo(); if (sz > 1 && !clientHbMgr.appHbHash) { - clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + clientHbMgr.appHbHash = + taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); } taosHashClear(clientHbMgr.appHbHash); } @@ -1433,6 +1437,4 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner -void taos_set_hb_quit(int8_t quitByKill) { - clientHbMgr.quitByKill = quitByKill; -} +void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; } diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index 907ec86623..5782890f73 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -53,6 +53,7 @@ void mndUpdateIpWhiteForAllUser(SMnode *pMnode, char *user, char *fqdn, int8_t t int32_t mndRefreshUserIpWhiteList(SMnode *pMnode); +int64_t mndGetUserIpWhiteListVer(SMnode *pMnode, SUserObj *pUser); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndPrivilege.c b/source/dnode/mnode/impl/src/mndPrivilege.c index 13a80cb1a6..fd7f9e5fb3 100644 --- a/source/dnode/mnode/impl/src/mndPrivilege.c +++ b/source/dnode/mnode/impl/src/mndPrivilege.c @@ -31,7 +31,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; } - int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) { memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN); pWhiteListRsp->numWhiteLists = 1; @@ -41,25 +40,6 @@ int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteLis } memset(pWhiteListRsp->pWhiteLists, 0, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range)); -// if (tsEnableWhiteList) { -// memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN); -// pWhiteListRsp->numWhiteLists = pUser->pIpWhiteList->num; -// pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range)); -// if (pWhiteListRsp->pWhiteLists == NULL) { -// return TSDB_CODE_OUT_OF_MEMORY; -// } -// memcpy(pWhiteListRsp->pWhiteLists, pUser->pIpWhiteList->pIpRange, -// pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range)); -// } else { -// memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN); -// pWhiteListRsp->numWhiteLists = 1; -// pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range)); -// if (pWhiteListRsp->pWhiteLists == NULL) { -// return TSDB_CODE_OUT_OF_MEMORY; -// } -// memset(pWhiteListRsp->pWhiteLists, 0, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range)); -// } - return 0; } @@ -70,7 +50,7 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp pRsp->sysInfo = pUser->sysInfo; pRsp->version = pUser->authVersion; pRsp->passVer = pUser->passVersion; - pRsp->whiteListVer = pUser->ipWhiteListVer; + pRsp->whiteListVer = mndGetUserIpWhiteListVer(pMnode, pUser); return 0; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index fafdb539fb..fcd4403aa4 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -300,7 +300,7 @@ _CONNECT: connectRsp.svrTimestamp = taosGetTimestampSec(); connectRsp.passVer = pUser->passVersion; connectRsp.authVer = pUser->authVersion; - connectRsp.whiteListVer = pUser->ipWhiteListVer; + connectRsp.whiteListVer = mndGetUserIpWhiteListVer(pMnode, pUser); strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 7cde8c5508..cbb72ee1f4 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -45,35 +45,55 @@ #define ALTER_USER_DEL_PRIVS(_type) ((_type) == TSDB_ALTER_USER_DEL_PRIVILEGES) #define ALTER_USER_ALL_PRIV(_priv) (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) -#define ALTER_USER_READ_PRIV(_priv) (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_READ) || BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) -#define ALTER_USER_WRITE_PRIV(_priv) (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_WRITE) || BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) -#define ALTER_USER_ALTER_PRIV(_priv) (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALTER) || BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) +#define ALTER_USER_READ_PRIV(_priv) \ + (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_READ) || BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) +#define ALTER_USER_WRITE_PRIV(_priv) \ + (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_WRITE) || BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) +#define ALTER_USER_ALTER_PRIV(_priv) \ + (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALTER) || BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_ALL)) #define ALTER_USER_SUBSCRIBE_PRIV(_priv) (BIT_FLAG_TEST_MASK((_priv), PRIVILEGE_TYPE_SUBSCRIBE)) #define ALTER_USER_TARGET_DB(_tbname) (0 == (_tbname)[0]) #define ALTER_USER_TARGET_TB(_tbname) (0 != (_tbname)[0]) -#define ALTER_USER_ADD_READ_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_DEL_READ_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_ADD_WRITE_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_DEL_WRITE_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_ADD_ALTER_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_DEL_ALTER_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_ADD_ALL_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_DEL_ALL_DB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_ADD_READ_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_DEL_READ_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_ADD_WRITE_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_DEL_WRITE_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_ADD_ALTER_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_DEL_ALTER_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_ADD_ALL_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) +#define ALTER_USER_DEL_ALL_DB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_DB(_tbname)) -#define ALTER_USER_ADD_READ_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_DEL_READ_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_ADD_WRITE_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_DEL_WRITE_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_ADD_ALTER_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_DEL_ALTER_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_ADD_ALL_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) -#define ALTER_USER_DEL_ALL_TB_PRIV(_type, _priv, _tbname) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) - -#define ALTER_USER_ADD_SUBSCRIBE_TOPIC_PRIV(_type, _priv) (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_SUBSCRIBE_PRIV(_priv)) -#define ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(_type, _priv) (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_SUBSCRIBE_PRIV(_priv)) +#define ALTER_USER_ADD_READ_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_DEL_READ_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_READ_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_ADD_WRITE_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_DEL_WRITE_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_WRITE_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_ADD_ALTER_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_DEL_ALTER_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALTER_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_ADD_ALL_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_DEL_ALL_TB_PRIV(_type, _priv, _tbname) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_ALL_PRIV(_priv) && ALTER_USER_TARGET_TB(_tbname)) +#define ALTER_USER_ADD_SUBSCRIBE_TOPIC_PRIV(_type, _priv) \ + (ALTER_USER_ADD_PRIVS(_type) && ALTER_USER_SUBSCRIBE_PRIV(_priv)) +#define ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(_type, _priv) \ + (ALTER_USER_DEL_PRIVS(_type) && ALTER_USER_SUBSCRIBE_PRIV(_priv)) static SIpWhiteList *createDefaultIpWhiteList(); SIpWhiteList *createIpWhiteList(void *buf, int32_t len); @@ -1575,8 +1595,8 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[1000] = {0}; - sprintf(detail, "enable:%d, superUser:%d, sysInfo:%d, password:xxx", - createReq.enable, createReq.superUser, createReq.sysInfo); + sprintf(detail, "enable:%d, superUser:%d, sysInfo:%d, password:xxx", createReq.enable, createReq.superUser, + createReq.sysInfo); auditRecord(pReq, pMnode->clusterId, "createUser", "", createReq.user, detail, strlen(detail)); @@ -1766,7 +1786,7 @@ static int32_t mndRemoveTablePriviledge(SMnode *pMnode, SHashObj *hash, SHashObj if (NULL == currRef) { return 0; } - + if (1 == *currRef) { if (taosHashRemove(useDbHash, alterReq->objname, dbKeyLen) != 0) { return -1; @@ -1800,12 +1820,12 @@ static char *mndUserAuditTypeStr(int32_t type) { return "error"; } -static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode *pMnode, SUserObj* pNewUser) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; +static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode *pMnode, SUserObj *pNewUser) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; - if (ALTER_USER_ADD_READ_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || - ALTER_USER_ADD_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_ADD_READ_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_ADD_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (strcmp(pAlterReq->objname, "1.*") != 0) { int32_t len = strlen(pAlterReq->objname) + 1; SDbObj *pDb = mndAcquireDb(pMnode, pAlterReq->objname); @@ -1830,7 +1850,8 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode } } - if (ALTER_USER_ADD_WRITE_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || ALTER_USER_ADD_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_ADD_WRITE_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_ADD_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (strcmp(pAlterReq->objname, "1.*") != 0) { int32_t len = strlen(pAlterReq->objname) + 1; SDbObj *pDb = mndAcquireDb(pMnode, pAlterReq->objname); @@ -1855,7 +1876,8 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode } } - if (ALTER_USER_DEL_READ_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || ALTER_USER_DEL_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_DEL_READ_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_DEL_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (strcmp(pAlterReq->objname, "1.*") != 0) { int32_t len = strlen(pAlterReq->objname) + 1; SDbObj *pDb = mndAcquireDb(pMnode, pAlterReq->objname); @@ -1870,7 +1892,8 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode } } - if (ALTER_USER_DEL_WRITE_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || ALTER_USER_DEL_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_DEL_WRITE_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_DEL_ALL_DB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (strcmp(pAlterReq->objname, "1.*") != 0) { int32_t len = strlen(pAlterReq->objname) + 1; SDbObj *pDb = mndAcquireDb(pMnode, pAlterReq->objname); @@ -1885,9 +1908,9 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode } } - SHashObj* pReadTbs = pNewUser->readTbs; - SHashObj* pWriteTbs = pNewUser->writeTbs; - SHashObj* pAlterTbs = pNewUser->alterTbs; + SHashObj *pReadTbs = pNewUser->readTbs; + SHashObj *pWriteTbs = pNewUser->writeTbs; + SHashObj *pAlterTbs = pNewUser->alterTbs; #ifdef TD_ENTERPRISE if (pAlterReq->isView) { @@ -1897,15 +1920,18 @@ static int32_t mndProcessAlterUserPrivilegesReq(SAlterUserReq *pAlterReq, SMnode } #endif - if (ALTER_USER_ADD_READ_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || ALTER_USER_ADD_ALL_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_ADD_READ_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_ADD_ALL_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (mndTablePriviledge(pMnode, pReadTbs, pNewUser->useDbs, pAlterReq, pSdb) != 0) return -1; } - if (ALTER_USER_ADD_WRITE_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || ALTER_USER_ADD_ALL_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_ADD_WRITE_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_ADD_ALL_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (mndTablePriviledge(pMnode, pWriteTbs, pNewUser->useDbs, pAlterReq, pSdb) != 0) return -1; } - if (ALTER_USER_ADD_ALTER_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || ALTER_USER_ADD_ALL_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { + if (ALTER_USER_ADD_ALTER_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName) || + ALTER_USER_ADD_ALL_TB_PRIV(pAlterReq->alterType, pAlterReq->privileges, pAlterReq->tabName)) { if (mndTablePriviledge(pMnode, pAlterTbs, pNewUser->useDbs, pAlterReq, pSdb) != 0) return -1; } @@ -2016,7 +2042,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { newUser.sysInfo = alterReq.sysInfo; } - if(alterReq.alterType == TSDB_ALTER_USER_CREATEDB) { + if (alterReq.alterType == TSDB_ALTER_USER_CREATEDB) { newUser.createdb = alterReq.createdb; } @@ -2119,52 +2145,43 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { code = mndAlterUser(pMnode, pUser, &newUser, pReq); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){ + if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) { char detail[1000] = {0}; sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, createdb:%d, tabName:%s, password:xxx", mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.createdb ? 1 : 0, alterReq.tabName); auditRecord(pReq, pMnode->clusterId, "alterUser", "", alterReq.user, detail, strlen(detail)); - } - else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER || - alterReq.alterType == TSDB_ALTER_USER_ENABLE || - alterReq.alterType == TSDB_ALTER_USER_SYSINFO || - alterReq.alterType == TSDB_ALTER_USER_CREATEDB){ + } else if (alterReq.alterType == TSDB_ALTER_USER_SUPERUSER || alterReq.alterType == TSDB_ALTER_USER_ENABLE || + alterReq.alterType == TSDB_ALTER_USER_SYSINFO || alterReq.alterType == TSDB_ALTER_USER_CREATEDB) { auditRecord(pReq, pMnode->clusterId, "alterUser", "", alterReq.user, alterReq.sql, alterReq.sqlLen); - } - else if(ALTER_USER_ADD_READ_DB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)|| - ALTER_USER_ADD_WRITE_DB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)|| - ALTER_USER_ADD_ALL_DB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)|| - ALTER_USER_ADD_READ_TB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)|| - ALTER_USER_ADD_WRITE_TB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)|| - ALTER_USER_ADD_ALL_TB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)){ - if (strcmp(alterReq.objname, "1.*") != 0){ + } else if (ALTER_USER_ADD_READ_DB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName) || + ALTER_USER_ADD_WRITE_DB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName) || + ALTER_USER_ADD_ALL_DB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName) || + ALTER_USER_ADD_READ_TB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName) || + ALTER_USER_ADD_WRITE_TB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName) || + ALTER_USER_ADD_ALL_TB_PRIV(alterReq.alterType, alterReq.privileges, alterReq.tabName)) { + if (strcmp(alterReq.objname, "1.*") != 0) { SName name = {0}; tNameFromString(&name, alterReq.objname, T_NAME_ACCT | T_NAME_DB); - auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", name.dbname, alterReq.user, - alterReq.sql, alterReq.sqlLen); - }else{ - auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", "", alterReq.user, - alterReq.sql, alterReq.sqlLen); + auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", name.dbname, alterReq.user, alterReq.sql, + alterReq.sqlLen); + } else { + auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", "", alterReq.user, alterReq.sql, alterReq.sqlLen); } - } - else if(ALTER_USER_ADD_SUBSCRIBE_TOPIC_PRIV(alterReq.alterType, alterReq.privileges)){ - auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.objname, alterReq.user, - alterReq.sql, alterReq.sqlLen); - } - else if(ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(alterReq.alterType, alterReq.privileges)){ - auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.objname, alterReq.user, - alterReq.sql, alterReq.sqlLen); - } - else{ - if (strcmp(alterReq.objname, "1.*") != 0){ + } else if (ALTER_USER_ADD_SUBSCRIBE_TOPIC_PRIV(alterReq.alterType, alterReq.privileges)) { + auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.objname, alterReq.user, alterReq.sql, + alterReq.sqlLen); + } else if (ALTER_USER_DEL_SUBSCRIBE_TOPIC_PRIV(alterReq.alterType, alterReq.privileges)) { + auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.objname, alterReq.user, alterReq.sql, + alterReq.sqlLen); + } else { + if (strcmp(alterReq.objname, "1.*") != 0) { SName name = {0}; tNameFromString(&name, alterReq.objname, T_NAME_ACCT | T_NAME_DB); - auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", name.dbname, alterReq.user, - alterReq.sql, alterReq.sqlLen); - }else{ - auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", "", alterReq.user, - alterReq.sql, alterReq.sqlLen); + auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", name.dbname, alterReq.user, alterReq.sql, + alterReq.sqlLen); + } else { + auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", "", alterReq.user, alterReq.sql, alterReq.sqlLen); } } @@ -2480,11 +2497,14 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfReadViews = taosHashGetSize(pUser->readViews); int32_t numOfWriteViews = taosHashGetSize(pUser->writeViews); int32_t numOfAlterViews = taosHashGetSize(pUser->alterViews); - if (numOfRows + numOfReadDbs + numOfWriteDbs + numOfTopics + numOfReadTbs + numOfWriteTbs + numOfAlterTbs + numOfReadViews + numOfWriteViews + numOfAlterViews >= rows) { + if (numOfRows + numOfReadDbs + numOfWriteDbs + numOfTopics + numOfReadTbs + numOfWriteTbs + numOfAlterTbs + + numOfReadViews + numOfWriteViews + numOfAlterViews >= + rows) { mInfo( "will restore. current num of rows: %d, read dbs %d, write dbs %d, topics %d, read tables %d, write tables " "%d, alter tables %d, read views %d, write views %d, alter views %d", - numOfRows, numOfReadDbs, numOfWriteDbs, numOfTopics, numOfReadTbs, numOfWriteTbs, numOfAlterTbs, numOfReadViews, numOfWriteViews, numOfAlterViews); + numOfRows, numOfReadDbs, numOfWriteDbs, numOfTopics, numOfReadTbs, numOfWriteTbs, numOfAlterTbs, + numOfReadViews, numOfWriteViews, numOfAlterViews); pShow->restore = true; sdbRelease(pSdb, pUser); break; @@ -2870,7 +2890,6 @@ int32_t mndUserRemoveView(SMnode *pMnode, STrans *pTrans, char *view) { return code; } - int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; @@ -2910,3 +2929,9 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) { mndUserFreeObj(&newUser); return code; } + +int64_t mndGetUserIpWhiteListVer(SMnode *pMnode, SUserObj *pUser) { + // ver = 0, disable ip white list + // ver > 0, enable ip white list + return tsEnableWhiteList ? pUser->ipWhiteListVer : 0; +} From b7710fa9b91a1dbaa2a73f699d34ec08782d9e05 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 20 Jun 2024 11:35:23 +0800 Subject: [PATCH 07/16] adj test case --- tests/script/tsim/stream/checkStreamSTable1.sim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/stream/checkStreamSTable1.sim b/tests/script/tsim/stream/checkStreamSTable1.sim index 942a947feb..d5781a4e85 100644 --- a/tests/script/tsim/stream/checkStreamSTable1.sim +++ b/tests/script/tsim/stream/checkStreamSTable1.sim @@ -15,7 +15,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2 from st interval(1s) ; -sleep 500 +sleep 1000 sql insert into t1 values(1648791211000,1,2,3); sql insert into t1 values(1648791212000,2,2,3); @@ -46,7 +46,7 @@ sql alter table streamt1 add column c3 double; print create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ; sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ; -sleep 500 +sleep 1000 sql insert into t2 values(1648791213000,1,2,3); sql insert into t1 values(1648791214000,1,2,3); From c9d847787efc30eaaf158f2c4f75503a3e6ce2c2 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 20 Jun 2024 14:21:33 +0800 Subject: [PATCH 08/16] fix: level task done number issue --- source/libs/scheduler/src/schTask.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 7f60353b1c..c317a63ce1 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -248,7 +248,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_LOG_TASK_END_TS(pTask); - atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1); + int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC); @@ -319,8 +319,10 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } } - SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask)); - + if (taskDone == pTask->level->taskNum) { + SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask)); + } + return TSDB_CODE_SUCCESS; } From 606b9cfcb0af5d46ffaa9dbe7cea0e2cf475e1da Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 20 Jun 2024 15:38:22 +0800 Subject: [PATCH 09/16] fix: error process for oom --- source/dnode/mnode/impl/inc/mndShow.h | 8 ++++++++ source/dnode/mnode/impl/src/mndCluster.c | 15 ++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndShow.h b/source/dnode/mnode/impl/inc/mndShow.h index 5a3e487e3d..de7068e9ad 100644 --- a/source/dnode/mnode/impl/inc/mndShow.h +++ b/source/dnode/mnode/impl/inc/mndShow.h @@ -23,6 +23,14 @@ extern "C" { #endif +#define COL_DATA_SET_VAL_RET(pData, isNull, pObj) \ + do { \ + if ((code = colDataSetVal(pColInfo, numOfRows, (pData), (isNull))) != 0) { \ + if (pObj) sdbRelease(pSdb, (pObj)); \ + return code; \ + } \ + } while (0) + int32_t mndInitShow(SMnode *pMnode); void mndCleanupShow(SMnode *pMnode); void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp); diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index a811a1ada0..31ebc6609d 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -280,6 +280,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pMsg->info.node; SSdb *pSdb = pMnode->pSdb; + int32_t code = 0; int32_t numOfRows = 0; int32_t cols = 0; SClusterObj *pCluster = NULL; @@ -290,31 +291,31 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock * cols = 0; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->id, false); + COL_DATA_SET_VAL_RET((const char *)&pCluster->id, false, pCluster); char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, buf, false); + COL_DATA_SET_VAL_RET(buf, false, pCluster); int32_t upTime = mndGetClusterUpTimeImp(pCluster); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&upTime, false); + COL_DATA_SET_VAL_RET((const char *)&upTime, false, pCluster); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false); + COL_DATA_SET_VAL_RET((const char *)&pCluster->createdTime, false, pCluster); char ver[12] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(ver, tsVersionName, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)ver, false); + COL_DATA_SET_VAL_RET((const char *)ver, false, pCluster); char expireTime[25] = {0}; pColInfo = taosArrayGet(pBlock->pDataBlock, cols); if (GRANT_EXPIRE_UNLIMITED(tsExpireTime / 1000)) { STR_WITH_MAXSIZE_TO_VARSTR(expireTime, "unlimited", pShow->pMeta->pSchemas[cols].bytes); - colDataSetVal(pColInfo, numOfRows, expireTime, false); + COL_DATA_SET_VAL_RET(expireTime, false, pCluster); } else if (tsExpireTime <= 0) { colDataSetNULL(pColInfo, numOfRows); } else { @@ -327,7 +328,7 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock * ts[0] = 0; } STR_WITH_MAXSIZE_TO_VARSTR(expireTime, ts, pShow->pMeta->pSchemas[cols].bytes); - colDataSetVal(pColInfo, numOfRows, expireTime, false); + COL_DATA_SET_VAL_RET(expireTime, false, pCluster); } sdbRelease(pSdb, pCluster); From 8e065ab47e267c59def9cb637d71b242423cdfda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jun 2024 21:28:54 +0800 Subject: [PATCH 10/16] fix(stream): add some logs. --- source/common/src/rsync.c | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 2ed21616dc..5d056cdade 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -169,15 +169,22 @@ int32_t uploadByRsync(const char* id, const char* path) { #else if (path[strlen(path) - 1] != '/') { #endif - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " + "rsync://%s/checkpoint/%s/", + tsLogDir, #ifdef WINDOWS pathTransform #else path #endif - , tsSnodeAddress, id); + , + tsSnodeAddress, id); } else { - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " + "rsync://%s/checkpoint/%s/", + tsLogDir, #ifdef WINDOWS pathTransform #else @@ -213,14 +220,15 @@ int32_t downloadRsync(const char* id, const char* path) { #endif char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --debug=all --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", - tsSnodeAddress, id, + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + tsLogDir, tsSnodeAddress, id, #ifdef WINDOWS pathTransform #else path #endif - ); + ); uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); @@ -249,7 +257,7 @@ int32_t deleteRsync(const char* id) { } char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); + snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp); From 2ecc725b1a646cdb38d216b00651b4e058f8fed5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jun 2024 21:31:23 +0800 Subject: [PATCH 11/16] fix(stream): not clear check downstream info. --- source/libs/stream/src/streamCheckStatus.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index b577147171..b64e0bb6d2 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -299,12 +299,10 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { taosThreadMutexLock(&pInfo->checkInfoLock); - streamTaskCompleteCheckRsp(pInfo, false, id); - pInfo->stopCheckProcess = 1; taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set stop check-rsp monit", id); + stDebug("s-task:%s set stop check-rsp monitor flag", id); return TSDB_CODE_SUCCESS; } @@ -438,6 +436,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { ASSERT(pInfo->startTs > 0); stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, pInfo->startTs); + pInfo->stopCheckProcess = 0; // disable auto stop of check process return TSDB_CODE_FAILED; } From 28b0148df97f85a1b8766adf43e4ecd073f6d5e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jun 2024 22:41:33 +0800 Subject: [PATCH 12/16] fix(stream): fix syntax error. --- source/common/src/rsync.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 5d056cdade..84e9615ddd 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -257,7 +257,9 @@ int32_t deleteRsync(const char* id) { } char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tsLogDir, + tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp); From ebf2df965c8dc15450695c7a399556a06d07e745 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 09:28:42 +0800 Subject: [PATCH 13/16] Update streamBackendRocksdb.c --- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 15c5272f3c..f614ecfd48 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2210,7 +2210,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { - cleanDir(temp, NULL); + cleanDir(temp, ""); } else { taosMkDir(temp); } @@ -4323,4 +4323,4 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) { taosThreadRwlockUnlock(&bm->rwLock); return code; } -#endif \ No newline at end of file +#endif From 6896d2493af1761581ea183348c8b94becb14182 Mon Sep 17 00:00:00 2001 From: tjuzyp Date: Fri, 21 Jun 2024 14:18:34 +0800 Subject: [PATCH 14/16] fix doc issue for ISNULL and NOTNULL on branch 3.0 --- docs/en/12-taos-sql/20-keywords.md | 4 ++-- docs/zh/12-taos-sql/20-keywords.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/12-taos-sql/20-keywords.md b/docs/en/12-taos-sql/20-keywords.md index 36cbc0948f..91be44c561 100644 --- a/docs/en/12-taos-sql/20-keywords.md +++ b/docs/en/12-taos-sql/20-keywords.md @@ -151,7 +151,7 @@ The following list shows all reserved keywords: - INTERVAL - INTO - IS -- ISNULL +- IS NULL ### J @@ -197,7 +197,7 @@ The following list shows all reserved keywords: - NMATCH - NONE - NOT -- NOTNULL +- NOT NULL - NOW - NULL - NULLS diff --git a/docs/zh/12-taos-sql/20-keywords.md b/docs/zh/12-taos-sql/20-keywords.md index f59eda1689..4d2454ed5a 100644 --- a/docs/zh/12-taos-sql/20-keywords.md +++ b/docs/zh/12-taos-sql/20-keywords.md @@ -151,7 +151,7 @@ description: TDengine 保留关键字的详细列表 - INTERVAL - INTO - IS -- ISNULL +- IS NULL ### J @@ -197,7 +197,7 @@ description: TDengine 保留关键字的详细列表 - NMATCH - NONE - NOT -- NOTNULL +- NOT NULL - NOW - NULL - NULLS From 67d661ac55f78e23fb3dc7d53fb61989e74730f0 Mon Sep 17 00:00:00 2001 From: shiyabin <470905797@qq.com> Date: Fri, 21 Jun 2024 16:52:03 +0800 Subject: [PATCH 15/16] Fix the kubernetes deployment documentation --- docs/en/10-deployment/03-k8s.md | 4 ++-- docs/zh/10-deployment/03-k8s.md | 40 ++++++++++++++++----------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/en/10-deployment/03-k8s.md b/docs/en/10-deployment/03-k8s.md index affb492169..3e5ba4c349 100644 --- a/docs/en/10-deployment/03-k8s.md +++ b/docs/en/10-deployment/03-k8s.md @@ -243,7 +243,7 @@ curl -u root:taosdata -d "show databases" 127.0.0.1:6041/rest/sql {"code":0,"column_meta":[["name","VARCHAR",64]],"data":[["information_schema"],["performance_schema"],["test"],["test1"]],"rows":4} ``` -## Test cluster +## Test cluster ### Data preparation @@ -335,7 +335,7 @@ tdengine-1 1/1 Running 1 (6m48s ago) 20m 10.244.0.59 node84 tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 ``` -At this time, the cluster mnode has a re-election, and the monde on dnode1 becomes the leader. +At this time, the cluster mnode has a re-election, and the monde on dnode2 becomes the leader. ```Bash kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G" diff --git a/docs/zh/10-deployment/03-k8s.md b/docs/zh/10-deployment/03-k8s.md index 31e909f02d..8be7ec9e9e 100644 --- a/docs/zh/10-deployment/03-k8s.md +++ b/docs/zh/10-deployment/03-k8s.md @@ -10,9 +10,9 @@ description: 利用 Kubernetes 部署 TDengine 集群的详细指南 为了满足[高可用](https://docs.taosdata.com/tdinternal/high-availability/)的需求,集群需要满足如下要求: -- 3个及以上 dnode :TDengine 的同一个 vgroup 中的多个 vnode ,不允许同时分布在一个 dnode ,所以如果创建3副本的数据库,则 dnode 数大于等于3 -- 3个 mnode :mnode 负责整个集群的管理工作,TDengine 默认是一个 mnode。如果这个 mnode 所在的 dnode 掉线,则整个集群不可用。 -- 数据库的3副本:TDengine 的副本配置是数据库级别,所以数据库3副本可满足在3个 dnode 的集群中,任意一个 dnode 下线,都不影响集群的正常使用。**如果下线** **dnode** **个数为2时,此时集群不可用,****因为****RAFT无法完成选举****。**(企业版:在灾难恢复场景,任一节点数据文件损坏,都可以通过重新拉起dnode进行恢复) +- 3 个及以上 dnode :TDengine 的同一个 vgroup 中的多个 vnode ,不允许同时分布在一个 dnode ,所以如果创建 3 副本的数据库,则 dnode 数大于等于 3 +- 3 个 mnode :mnode 负责整个集群的管理工作,TDengine 默认是一个 mnode。如果这个 mnode 所在的 dnode 掉线,则整个集群不可用。 +- 数据库的 3 副本:TDengine 的副本配置是数据库级别,所以数据库 3 副本可满足在 3 个 dnode 的集群中,任意一个 dnode 下线,都不影响集群的正常使用。**如果下线** **dnode** **个数为 2 时,此时集群不可用,\*\***因为\***\*RAFT 无法完成选举\*\***。\*\*(企业版:在灾难恢复场景,任一节点数据文件损坏,都可以通过重新拉起 dnode 进行恢复) ## 前置条件 @@ -52,7 +52,7 @@ spec: 根据 Kubernetes 对各类部署的说明,我们将使用 StatefulSet 作为 TDengine 的部署资源类型。 创建文件 `tdengine.yaml`,其中 replicas 定义集群节点的数量为 3。节点时区为中国(Asia/Shanghai),每个节点分配 5G 标准(standard)存储(参考[Storage Classes](https://kubernetes.io/docs/concepts/storage/storage-classes/) 配置 storage class )。你也可以根据实际情况进行相应修改。 -请特别注意startupProbe的配置,在 dnode 的 Pod 掉线一段时间后,再重新启动,这个时候新上线的 dnode 会短暂不可用。如果startupProbe配置过小,Kubernetes 会认为该 Pod 处于不正常的状态,并尝试重启该 Pod,该 dnode 的 Pod 会频繁重启,始终无法恢复到正常状态。参考 [Configure Liveness, Readiness and Startup Probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/) +请特别注意 startupProbe 的配置,在 dnode 的 Pod 掉线一段时间后,再重新启动,这个时候新上线的 dnode 会短暂不可用。如果 startupProbe 配置过小,Kubernetes 会认为该 Pod 处于不正常的状态,并尝试重启该 Pod,该 dnode 的 Pod 会频繁重启,始终无法恢复到正常状态。参考 [Configure Liveness, Readiness and Startup Probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/) ```YAML --- @@ -176,7 +176,7 @@ taos> show dnodes Query OK, 3 row(s) in set (0.001853s) ``` -查看当前mnode +查看当前 mnode ```Bash kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G" @@ -191,14 +191,14 @@ reboot_time: 2023-07-19 17:54:19.520 Query OK, 1 row(s) in set (0.001282s) ``` -## 创建mnode +## 创建 mnode ```Bash kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "create mnode on dnode 2" kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "create mnode on dnode 3" ``` -查看mnode +查看 mnode ```Bash kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G" @@ -249,7 +249,7 @@ curl -u root:taosdata -d "show databases" 127.0.0.1:6041/rest/sql #### taosBenchmark -通过taosBenchmark 创建一个3副本的数据库,同时写入1亿条数据,同时查看数据 +通过 taosBenchmark 创建一个 3 副本的数据库,同时写入 1 亿条数据,同时查看数据 ```Bash kubectl exec -it tdengine-0 -n tdengine-test -- taosBenchmark -I stmt -d test -n 10000 -t 10000 -a 3 @@ -264,7 +264,7 @@ taos> select count(*) from test.meters; Query OK, 1 row(s) in set (0.103537s) ``` -查看vnode分布,通过show dnodes +查看 vnode 分布,通过 show dnodes ```Bash kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show dnodes" @@ -278,7 +278,7 @@ taos> show dnodes Query OK, 3 row(s) in set (0.001357s) ``` -通过show vgroup 查看 vnode 分布情况 +通过 show vgroup 查看 vnode 分布情况 ```Bash kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show test.vgroups" @@ -299,7 +299,7 @@ Query OK, 8 row(s) in set (0.001488s) #### 手工创建 -常见一个三副本的test1,并创建一张表,写入2条数据 +常见一个三副本的 test1,并创建一张表,写入 2 条数据 ```Bash kubectl exec -it tdengine-0 -n tdengine-test -- \ @@ -310,7 +310,7 @@ kubectl exec -it tdengine-0 -n tdengine-test -- \ insert into t1 values(now, 1)(now+1s, 2);" ``` -通过show test1.vgroup 查看xnode分布情况 +通过 show test1.vgroup 查看 xnode 分布情况 ```Bash kubectl exec -it tdengine-0 -n tdengine-test -- taos -s "show test1.vgroups" @@ -335,7 +335,7 @@ tdengine-1 1/1 Running 1 (6m48s ago) 20m 10.244.0.59 node84 tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 ``` -此时集群mnode发生重新选举,dnode1上的monde 成为leader +此时集群 mnode 发生重新选举,dnode2 上的 monde 成为 leader ```Bash kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G" @@ -389,7 +389,7 @@ taos> select *from test1.t1 Query OK, 4 row(s) in set (0.001994s) ``` -同理,至于非leader得mnode掉线,读写当然可以正常进行,这里就不做过多的展示。 +同理,至于非 leader 得 mnode 掉线,读写当然可以正常进行,这里就不做过多的展示。 ## 集群扩容 @@ -436,7 +436,7 @@ Query OK, 4 row(s) in set (0.003628s) ## 集群缩容 -由于 TDengine 集群在扩缩容时会对数据进行节点间迁移,使用 kubectl 命令进行缩容需要首先使用 "drop dnodes" 命令(**如果集群中存在3副本的db,那么缩容后的** **dnode** **个数也要必须大于等于3,否则drop dnode操作会被中止**),然后再节点删除完成后再进行 Kubernetes 集群缩容。 +由于 TDengine 集群在扩缩容时会对数据进行节点间迁移,使用 kubectl 命令进行缩容需要首先使用 "drop dnodes" 命令(**如果集群中存在 3 副本的 db,那么缩容后的** **dnode** **个数也要必须大于等于 3,否则 drop dnode 操作会被中止**),然后再节点删除完成后再进行 Kubernetes 集群缩容。 注意:由于 Kubernetes Statefulset 中 Pod 的只能按创建顺序逆序移除,所以 TDengine drop dnode 也需要按照创建顺序逆序移除,否则会导致 Pod 处于错误状态。 @@ -459,7 +459,7 @@ Query OK, 3 row(s) in set (0.003324s) kubectl scale statefulsets tdengine --replicas=3 -n tdengine-test ``` -最后一个 POD 将会被删除。使用命令 kubectl get pods -l app=tdengine 查看POD状态: +最后一个 POD 将会被删除。使用命令 kubectl get pods -l app=tdengine 查看 POD 状态: ```Plain kubectl get pod -l app=tdengine -n tdengine-test -o wide @@ -469,7 +469,7 @@ tdengine-1 1/1 Running 1 (7h9m ago) 7h23m 10.244.0.59 node84 < tdengine-2 1/1 Running 0 5h45m 10.244.1.224 node85 ``` -POD删除后,需要手动删除PVC,否则下次扩容时会继续使用以前的数据导致无法正常加入集群。 +POD 删除后,需要手动删除 PVC,否则下次扩容时会继续使用以前的数据导致无法正常加入集群。 ```Bash kubectl delete pvc aosdata-tdengine-3 -n tdengine-test @@ -502,7 +502,7 @@ Query OK, 4 row(s) in set (0.003881s) ## 清理 TDengine 集群 -> **删除pvc时需要注意下pv persistentVolumeReclaimPolicy策略,建议改为Delete,这样在删除pvc时才会自动清理pv,同时会清理底层的csi存储资源,如果没有配置删除pvc自动清理pv的策略,再删除pvc后,在手动清理pv时,pv对应的csi存储资源可能不会被释放。** +> **删除 pvc 时需要注意下 pv persistentVolumeReclaimPolicy 策略,建议改为 Delete,这样在删除 pvc 时才会自动清理 pv,同时会清理底层的 csi 存储资源,如果没有配置删除 pvc 自动清理 pv 的策略,再删除 pvc 后,在手动清理 pv 时,pv 对应的 csi 存储资源可能不会被释放。** 完整移除 TDengine 集群,需要分别清理 statefulset、svc、configmap、pvc。 @@ -537,8 +537,8 @@ Query OK, 4 row(s) in set (0.003862s) 对于在 Kubernetes 环境下 TDengine 的高可用和高可靠来说,对于硬件损坏、灾难恢复,分为两个层面来讲: 1. 底层的分布式块存储具备的灾难恢复能力,块存储的多副本,当下流行的分布式块存储如 Ceph,就具备多副本能力,将存储副本扩展到不同的机架、机柜、机房、数据中心(或者直接使用公有云厂商提供的块存储服务) -2. TDengine的灾难恢复,在 TDengine Enterprise 中,本身具备了当一个 dnode 永久下线(物理机磁盘损坏,数据分拣丢失)后,重新拉起一个空白的dnode来恢复原dnode的工作。 +2. TDengine 的灾难恢复,在 TDengine Enterprise 中,本身具备了当一个 dnode 永久下线(物理机磁盘损坏,数据分拣丢失)后,重新拉起一个空白的 dnode 来恢复原 dnode 的工作。 -最后,欢迎使用[TDengine Cloud](https://cloud.taosdata.com/),来体验一站式全托管的TDengine云服务。 +最后,欢迎使用[TDengine Cloud](https://cloud.taosdata.com/),来体验一站式全托管的 TDengine 云服务。 > TDengine Cloud 是一个极简的全托管时序数据处理云服务平台,它是基于开源的时序数据库 TDengine 而开发的。除高性能的时序数据库之外,它还具有缓存、订阅和流计算等系统功能,而且提供了便利而又安全的数据分享、以及众多的企业级功能。它可以让物联网、工业互联网、金融、IT 运维监控等领域企业在时序数据的管理上大幅降低人力成本和运营成本。 From eb784f7e04b08264bcd255392a056c04fd072fc7 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 22 Jun 2024 14:48:25 +0800 Subject: [PATCH 16/16] case: add modify two three combine item --- .../cluster/incSnapshot.py | 0 tests/army/storage/compressBasic.py | 45 +++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) rename tests/army/{community => cluster}/cluster/incSnapshot.py (100%) diff --git a/tests/army/community/cluster/incSnapshot.py b/tests/army/cluster/cluster/incSnapshot.py similarity index 100% rename from tests/army/community/cluster/incSnapshot.py rename to tests/army/cluster/cluster/incSnapshot.py diff --git a/tests/army/storage/compressBasic.py b/tests/army/storage/compressBasic.py index 64d0d4148f..0d62ab9e8b 100644 --- a/tests/army/storage/compressBasic.py +++ b/tests/army/storage/compressBasic.py @@ -194,11 +194,11 @@ class TDTestCase(TBase): # alter float(c9) double(c10) to tsz comp = "tsz" sql = f"alter table {tbname} modify column c9 COMPRESS '{comp}';" - tdSql.execute(sql) + tdSql.execute(sql, show=True) self.checkDataDesc(tbname, 10, 5, comp) self.writeData(10000) sql = f"alter table {tbname} modify column c10 COMPRESS '{comp}';" - tdSql.execute(sql) + tdSql.execute(sql, show=True) self.checkDataDesc(tbname, 11, 5, comp) self.writeData(10000) @@ -207,9 +207,48 @@ class TDTestCase(TBase): for i in range(self.colCnt - 1): col = f"c{i}" sql = f"alter table {tbname} modify column {col} LEVEL '{level}';" - tdSql.execute(sql) + tdSql.execute(sql, show=True) + self.checkDataDesc(tbname, i + 1, 6, level) self.writeData(1000) + # modify two combine + + + i = 9 + encode = "delta-d" + compress = "zlib" + sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}';" + tdSql.execute(sql, show=True) + self.checkDataDesc(tbname, i + 1, 4, encode) + self.checkDataDesc(tbname, i + 1, 5, compress) + + i = 10 + encode = "delta-d" + level = "high" + sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' LEVEL '{level}';" + tdSql.execute(sql, show=True) + self.checkDataDesc(tbname, i + 1, 4, encode) + self.checkDataDesc(tbname, i + 1, 6, level) + + i = 2 + compress = "zlib" + level = "high" + sql = f"alter table {tbname} modify column c{i} COMPRESS '{compress}' LEVEL '{level}';" + tdSql.execute(sql, show=True) + self.checkDataDesc(tbname, i + 1, 5, compress) + self.checkDataDesc(tbname, i + 1, 6, level) + + # modify three combine + i = 7 + encode = "simple8b" + compress = "zstd" + level = "medium" + sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}' LEVEL '{level}';" + tdSql.execute(sql, show=True) + self.checkDataDesc(tbname, i + 1, 4, encode) + self.checkDataDesc(tbname, i + 1, 5, compress) + self.checkDataDesc(tbname, i + 1, 6, level) + # alter error sqls = [ "alter table nodb.nostb modify column ts LEVEL 'high';",