Merge branch '3.0' into fix/TD-30677-3.0

This commit is contained in:
kailixu 2024-06-22 21:01:36 +08:00
commit 6f7b0145e0
22 changed files with 205 additions and 116 deletions

View File

@ -335,7 +335,7 @@ tdengine-1 1/1 Running 1 (6m48s ago) 20m 10.244.0.59 node84
tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 <none> <none>
```
At this time, the cluster mnode has a re-election, and the monde on dnode1 becomes the leader.
At this time, the cluster mnode has a re-election, and the monde on dnode2 becomes the leader.
```Bash
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"

View File

@ -12,7 +12,7 @@ description: 利用 Kubernetes 部署 TDengine 集群的详细指南
- 3 个及以上 dnode TDengine 的同一个 vgroup 中的多个 vnode ,不允许同时分布在一个 dnode ,所以如果创建 3 副本的数据库,则 dnode 数大于等于 3
- 3 个 mnode mnode 负责整个集群的管理工作TDengine 默认是一个 mnode。如果这个 mnode 所在的 dnode 掉线,则整个集群不可用。
- 数据库的3副本TDengine 的副本配置是数据库级别所以数据库3副本可满足在3个 dnode 的集群中,任意一个 dnode 下线,都不影响集群的正常使用。**如果下线** **dnode** **个数为2时此时集群不可用****因为****RAFT无法完成选举****。**企业版在灾难恢复场景任一节点数据文件损坏都可以通过重新拉起dnode进行恢复
- 数据库的 3 副本TDengine 的副本配置是数据库级别,所以数据库 3 副本可满足在 3 个 dnode 的集群中,任意一个 dnode 下线,都不影响集群的正常使用。**如果下线** **dnode** **个数为 2 时,此时集群不可用,\*\***因为\***\*RAFT 无法完成选举\*\***。\*\*(企业版:在灾难恢复场景,任一节点数据文件损坏,都可以通过重新拉起 dnode 进行恢复)
## 前置条件
@ -335,7 +335,7 @@ tdengine-1 1/1 Running 1 (6m48s ago) 20m 10.244.0.59 node84
tdengine-2 1/1 Running 0 21m 10.244.1.223 node85 <none> <none>
```
此时集群mnode发生重新选举dnode1上的monde 成为leader
此时集群 mnode 发生重新选举dnode2 上的 monde 成为 leader
```Bash
kubectl exec -it tdengine-1 -n tdengine-test -- taos -s "show mnodes\G"

View File

@ -29,6 +29,8 @@ extern "C" {
#endif
#define GRANT_HEART_BEAT_MIN 2
#define GRANT_EXPIRE_VALUE (31556995201)
#define GRANT_EXPIRE_UNLIMITED(v) ((v) == GRANT_EXPIRE_VALUE)
#define GRANT_ACTIVE_CODE "activeCode"
#define GRANT_FLAG_ALL (0x01)
#define GRANT_FLAG_AUDIT (0x02)

View File

@ -134,13 +134,14 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
if (pTscObj->whiteListInfo.fp) {
SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
int64_t oldVer = atomic_load_64(&whiteListInfo->ver);
if (oldVer < pRsp->whiteListVer) {
if (oldVer < pRsp->whiteListVer || pRsp->whiteListVer == 0) {
atomic_store_64(&whiteListInfo->ver, pRsp->whiteListVer);
if (whiteListInfo->fp) {
(*whiteListInfo->fp)(whiteListInfo->param, &pRsp->whiteListVer, TAOS_NOTIFY_WHITELIST_VER);
}
tscDebug("update whitelist version of user %s from %"PRId64" to %"PRId64", tscRid:%" PRIi64, pRsp->user, oldVer,
atomic_load_64(&whiteListInfo->ver), pTscObj->id);
tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user,
oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id);
}
}
releaseTscObj(pReq->connKey.tscRid);
@ -202,8 +203,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
for (int32_t i = 0; i < numOfBatchs; ++i) {
SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (rsp->useDbRsp) {
tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64,
rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
if (rsp->useDbRsp->vgVersion < 0) {
tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
@ -225,7 +226,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
goto _return;
}
catalogUpdateDBVgInfo(pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->useDbRsp->uid, vgInfo);
catalogUpdateDBVgInfo(pCatalog,
(rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
rsp->useDbRsp->uid, vgInfo);
}
}
}
@ -294,7 +297,6 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion *)value);
}
@ -799,7 +801,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
for (int32_t i = 0; i < dbNum; ++i) {
SDbCacheInfo *db = &dbs[i];
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64
", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
db->dbId = htobe64(db->dbId);
@ -1151,7 +1154,8 @@ static void *hbThreadFunc(void *param) {
if (sz > 0) {
hbGatherAppInfo();
if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
clientHbMgr.appHbHash =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
taosHashClear(clientHbMgr.appHbHash);
}
@ -1433,6 +1437,4 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
}
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
void taos_set_hb_quit(int8_t quitByKill) {
clientHbMgr.quitByKill = quitByKill;
}
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }

View File

@ -169,15 +169,22 @@ int32_t uploadByRsync(const char* id, const char* path) {
#else
if (path[strlen(path) - 1] != '/') {
#endif
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/",
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
"rsync://%s/checkpoint/%s/",
tsLogDir,
#ifdef WINDOWS
pathTransform
#else
path
#endif
, tsSnodeAddress, id);
,
tsSnodeAddress, id);
} else {
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
"rsync://%s/checkpoint/%s/",
tsLogDir,
#ifdef WINDOWS
pathTransform
#else
@ -213,8 +220,9 @@ int32_t downloadRsync(const char* id, const char* path) {
#endif
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --debug=all --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsSnodeAddress, id,
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsLogDir, tsSnodeAddress, id,
#ifdef WINDOWS
pathTransform
#else
@ -249,7 +257,9 @@ int32_t deleteRsync(const char* id) {
}
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id);
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tsLogDir,
tmp, tsSnodeAddress, id);
code = execCommand(command);
taosRemoveDir(tmp);

View File

@ -85,7 +85,7 @@ static const SSysDbTableSchema clusterSchema[] = {
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
static const SSysDbTableSchema userDBSchema[] = {

View File

@ -23,6 +23,14 @@
extern "C" {
#endif
#define COL_DATA_SET_VAL_RET(pData, isNull, pObj) \
do { \
if ((code = colDataSetVal(pColInfo, numOfRows, (pData), (isNull))) != 0) { \
if (pObj) sdbRelease(pSdb, (pObj)); \
return code; \
} \
} while (0)
int32_t mndInitShow(SMnode *pMnode);
void mndCleanupShow(SMnode *pMnode);
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);

View File

@ -53,6 +53,7 @@ void mndUpdateIpWhiteForAllUser(SMnode *pMnode, char *user, char *fqdn, int8_t t
int32_t mndRefreshUserIpWhiteList(SMnode *pMnode);
int64_t mndGetUserIpWhiteListVer(SMnode *pMnode, SUserObj *pUser);
#ifdef __cplusplus
}
#endif

View File

@ -280,6 +280,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pMsg->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t code = 0;
int32_t numOfRows = 0;
int32_t cols = 0;
SClusterObj *pCluster = NULL;
@ -290,31 +291,44 @@ static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->id, false);
COL_DATA_SET_VAL_RET((const char *)&pCluster->id, false, pCluster);
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, buf, false);
COL_DATA_SET_VAL_RET(buf, false, pCluster);
int32_t upTime = mndGetClusterUpTimeImp(pCluster);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&upTime, false);
COL_DATA_SET_VAL_RET((const char *)&upTime, false, pCluster);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
COL_DATA_SET_VAL_RET((const char *)&pCluster->createdTime, false, pCluster);
char ver[12] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(ver, tsVersionName, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)ver, false);
COL_DATA_SET_VAL_RET((const char *)ver, false, pCluster);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (tsExpireTime <= 0) {
char expireTime[25] = {0};
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
if (GRANT_EXPIRE_UNLIMITED(tsExpireTime / 1000)) {
STR_WITH_MAXSIZE_TO_VARSTR(expireTime, "unlimited", pShow->pMeta->pSchemas[cols].bytes);
COL_DATA_SET_VAL_RET(expireTime, false, pCluster);
} else if (tsExpireTime <= 0) {
colDataSetNULL(pColInfo, numOfRows);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&tsExpireTime, false);
char ts[20] = {0};
time_t expireSec = tsExpireTime / 1000;
struct tm ptm;
if (taosLocalTime(&expireSec, &ptm, ts) != NULL) {
strftime(ts, 20, "%Y-%m-%d %H:%M:%S", &ptm);
} else {
ts[0] = 0;
}
STR_WITH_MAXSIZE_TO_VARSTR(expireTime, ts, pShow->pMeta->pSchemas[cols].bytes);
COL_DATA_SET_VAL_RET(expireTime, false, pCluster);
}
sdbRelease(pSdb, pCluster);

View File

@ -31,7 +31,6 @@ int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType op
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteListRsp *pWhiteListRsp) {
memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN);
pWhiteListRsp->numWhiteLists = 1;
@ -41,25 +40,6 @@ int32_t mndSetUserWhiteListRsp(SMnode *pMnode, SUserObj *pUser, SGetUserWhiteLis
}
memset(pWhiteListRsp->pWhiteLists, 0, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// if (tsEnableWhiteList) {
// memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN);
// pWhiteListRsp->numWhiteLists = pUser->pIpWhiteList->num;
// pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// if (pWhiteListRsp->pWhiteLists == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// memcpy(pWhiteListRsp->pWhiteLists, pUser->pIpWhiteList->pIpRange,
// pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// } else {
// memcpy(pWhiteListRsp->user, pUser->user, TSDB_USER_LEN);
// pWhiteListRsp->numWhiteLists = 1;
// pWhiteListRsp->pWhiteLists = taosMemoryMalloc(pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// if (pWhiteListRsp->pWhiteLists == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
// memset(pWhiteListRsp->pWhiteLists, 0, pWhiteListRsp->numWhiteLists * sizeof(SIpV4Range));
// }
return 0;
}
@ -70,7 +50,7 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
pRsp->sysInfo = pUser->sysInfo;
pRsp->version = pUser->authVersion;
pRsp->passVer = pUser->passVersion;
pRsp->whiteListVer = pUser->ipWhiteListVer;
pRsp->whiteListVer = mndGetUserIpWhiteListVer(pMnode, pUser);
return 0;
}

View File

@ -300,7 +300,7 @@ _CONNECT:
connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion;
connectRsp.authVer = pUser->authVersion;
connectRsp.whiteListVer = pUser->ipWhiteListVer;
connectRsp.whiteListVer = mndGetUserIpWhiteListVer(pMnode, pUser);
strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,

View File

@ -3042,3 +3042,9 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
mndUserFreeObj(&newUser);
return code;
}
int64_t mndGetUserIpWhiteListVer(SMnode *pMnode, SUserObj *pUser) {
// ver = 0, disable ip white list
// ver > 0, enable ip white list
return tsEnableWhiteList ? pUser->ipWhiteListVer : 0;
}

View File

@ -2914,7 +2914,8 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, STokenPair* pPrivLevel, SToken* pUserName,
SNode* pTagCond) {
CHECK_PARSER_STATUS(pCxt);
if (!checkDbName(pCxt, &pPrivLevel->first, false) || !checkUserName(pCxt, pUserName)) {
if (!checkDbName(pCxt, &pPrivLevel->first, false) || !checkUserName(pCxt, pUserName) ||
!checkTableName(pCxt, &pPrivLevel->second)) {
return NULL;
}
SGrantStmt* pStmt = (SGrantStmt*)nodesMakeNode(QUERY_NODE_GRANT_STMT);

View File

@ -567,7 +567,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
}
SSchLevel *pLevel = pTask->level;
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1);
int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
if (doneNum == pLevel->taskNum) {
atomic_sub_fetch_32(&pJob->levelIdx, 1);

View File

@ -248,6 +248,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_LOG_TASK_END_TS(pTask);
int32_t taskDone = atomic_add_fetch_32(&pTask->level->taskExecDoneNum, 1);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
SCH_ERR_RET(schRecordTaskSucceedNode(pJob, pTask));
@ -317,7 +319,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
}
if (taskDone == pTask->level->taskNum) {
SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
}
return TSDB_CODE_SUCCESS;
}
@ -483,6 +487,34 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
SSchLevel *pLevel = pTask->level;
SCH_TASK_DLOG("start to reset level for current task set, execDone:%d, launched:%d",
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
}
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
int32_t childrenNum = taosArrayGetSize(pTask->children);
for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask *pChild = taosArrayGetP(pTask->children, i);
SCH_LOCK_TASK(pChild);
pLevel = pChild->level;
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
SCH_UNLOCK_TASK(pChild);
}
SCH_TASK_DLOG("end to reset level for current task set, execDone:%d, launched:%d",
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
return TSDB_CODE_SUCCESS;
}
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
@ -498,12 +530,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i
SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
pLevel->taskExecDoneNum = 0;
pLevel->taskLaunchedNum = 0;
}
SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
SCH_RESET_JOB_LEVEL_IDX(pJob);

View File

@ -2210,7 +2210,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
if (taosDirExist(temp)) {
cleanDir(temp, NULL);
cleanDir(temp, "");
} else {
taosMkDir(temp);
}

View File

@ -299,12 +299,10 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, false, id);
pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check-rsp monit", id);
stDebug("s-task:%s set stop check-rsp monitor flag", id);
return TSDB_CODE_SUCCESS;
}
@ -438,6 +436,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
pInfo->startTs);
pInfo->stopCheckProcess = 0; // disable auto stop of check process
return TSDB_CODE_FAILED;
}

View File

@ -194,11 +194,11 @@ class TDTestCase(TBase):
# alter float(c9) double(c10) to tsz
comp = "tsz"
sql = f"alter table {tbname} modify column c9 COMPRESS '{comp}';"
tdSql.execute(sql)
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, 10, 5, comp)
self.writeData(10000)
sql = f"alter table {tbname} modify column c10 COMPRESS '{comp}';"
tdSql.execute(sql)
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, 11, 5, comp)
self.writeData(10000)
@ -207,9 +207,48 @@ class TDTestCase(TBase):
for i in range(self.colCnt - 1):
col = f"c{i}"
sql = f"alter table {tbname} modify column {col} LEVEL '{level}';"
tdSql.execute(sql)
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, i + 1, 6, level)
self.writeData(1000)
# modify two combine
i = 9
encode = "delta-d"
compress = "zlib"
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}';"
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, i + 1, 4, encode)
self.checkDataDesc(tbname, i + 1, 5, compress)
i = 10
encode = "delta-d"
level = "high"
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' LEVEL '{level}';"
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, i + 1, 4, encode)
self.checkDataDesc(tbname, i + 1, 6, level)
i = 2
compress = "zlib"
level = "high"
sql = f"alter table {tbname} modify column c{i} COMPRESS '{compress}' LEVEL '{level}';"
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, i + 1, 5, compress)
self.checkDataDesc(tbname, i + 1, 6, level)
# modify three combine
i = 7
encode = "simple8b"
compress = "zstd"
level = "medium"
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}' LEVEL '{level}';"
tdSql.execute(sql, show=True)
self.checkDataDesc(tbname, i + 1, 4, encode)
self.checkDataDesc(tbname, i + 1, 5, compress)
self.checkDataDesc(tbname, i + 1, 6, level)
# alter error
sqls = [
"alter table nodb.nostb modify column ts LEVEL 'high';",

View File

@ -15,7 +15,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2 from st interval(1s) ;
sleep 500
sleep 1000
sql insert into t1 values(1648791211000,1,2,3);
sql insert into t1 values(1648791212000,2,2,3);
@ -46,7 +46,7 @@ sql alter table streamt1 add column c3 double;
print create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
sleep 500
sleep 1000
sql insert into t2 values(1648791213000,1,2,3);
sql insert into t1 values(1648791214000,1,2,3);