diff --git a/deps/arm/dm_static/libdmodule.a b/deps/arm/dm_static/libdmodule.a
index 37077ef63b..072e9f857d 100644
Binary files a/deps/arm/dm_static/libdmodule.a and b/deps/arm/dm_static/libdmodule.a differ
diff --git a/deps/darwin/arm/dm_static/libdmodule.a b/deps/darwin/arm/dm_static/libdmodule.a
index 246b2247af..2bede61caa 100644
Binary files a/deps/darwin/arm/dm_static/libdmodule.a and b/deps/darwin/arm/dm_static/libdmodule.a differ
diff --git a/deps/darwin/x64/dm_static/libdmodule.a b/deps/darwin/x64/dm_static/libdmodule.a
index 8745f57636..ae1abc5dfc 100644
Binary files a/deps/darwin/x64/dm_static/libdmodule.a and b/deps/darwin/x64/dm_static/libdmodule.a differ
diff --git a/deps/mips/dm_static/libdmodule.a b/deps/mips/dm_static/libdmodule.a
index 855a6a41d9..868ac62d3e 100644
Binary files a/deps/mips/dm_static/libdmodule.a and b/deps/mips/dm_static/libdmodule.a differ
diff --git a/deps/x86/dm_static/libdmodule.a b/deps/x86/dm_static/libdmodule.a
index 6a3c0d45c2..658bb95c27 100644
Binary files a/deps/x86/dm_static/libdmodule.a and b/deps/x86/dm_static/libdmodule.a differ
diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md
index fd6d3a2930..5e4eadcceb 100755
--- a/docs/en/14-reference/12-config/index.md
+++ b/docs/en/14-reference/12-config/index.md
@@ -421,7 +421,7 @@ The charset that takes effect is UTF-8.
| Applicable | Server Only |
| Meaning | Maximum number of vnodes per dnode |
| Value Range | 0-4096 |
-| Default Value | 2x the CPU cores |
+| Default Value | 2x the CPU cores plus 5 |
## Performance Tuning
diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md
index d4487e6148..f295e57bb5 100644
--- a/docs/en/28-releases/01-tdengine.md
+++ b/docs/en/28-releases/01-tdengine.md
@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
+## 3.3.0.3
+
+
+
## 3.3.0.0
diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md
index f0347f424c..6fce985927 100755
--- a/docs/zh/14-reference/12-config/index.md
+++ b/docs/zh/14-reference/12-config/index.md
@@ -419,7 +419,7 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅服务端适用 |
| 含义 | dnode 支持的最大 vnode 数目 |
| 取值范围 | 0-4096 |
-| 缺省值 | CPU 核数的 2 倍 |
+| 缺省值 | CPU 核数的 2 倍 + 5 |
## 性能调优
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index 93b24c9b69..f69e1fd4a8 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
+## 3.3.0.3
+
+
+
## 3.3.0.0
diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h
index 32cd7bb2ab..197fa125f5 100644
--- a/include/common/tdatablock.h
+++ b/include/common/tdatablock.h
@@ -274,7 +274,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid);
-bool alreadyAddGroupId(char* ctbName);
+bool alreadyAddGroupId(char* ctbName, int64_t groupId);
bool isAutoTableName(char* ctbName);
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
diff --git a/include/common/tglobal.h b/include/common/tglobal.h
index 17ed732af8..95b7591263 100644
--- a/include/common/tglobal.h
+++ b/include/common/tglobal.h
@@ -257,8 +257,10 @@ extern bool tsExperimental;
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
+int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd,
+ const char *envFile, char *apolloUrl, SArray *pArgs);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
- bool tsc, bool isDumpCfg);
+ bool tsc);
void taosCleanupCfg();
int32_t taosCfgDynamicOptions(SConfig *pCfg, const char *name, bool forServer);
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index d101b64f0e..5df3c2bd60 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -2358,20 +2358,24 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp* pRsp);
typedef struct {
int32_t dnodeId;
char* token;
-} SMArbUpdateGroupReqMember;
+} SMArbUpdateGroupMember;
typedef struct {
- int32_t vgId;
- int64_t dbUid;
- SMArbUpdateGroupReqMember members[2];
- int8_t isSync;
- SMArbUpdateGroupReqMember assignedLeader;
- int64_t version;
-} SMArbUpdateGroupReq;
+ int32_t vgId;
+ int64_t dbUid;
+ SMArbUpdateGroupMember members[2];
+ int8_t isSync;
+ SMArbUpdateGroupMember assignedLeader;
+ int64_t version;
+} SMArbUpdateGroup;
-int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
-int32_t tDeserializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
-void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq* pReq);
+typedef struct {
+ SArray* updateArray; // SMArbUpdateGroup
+} SMArbUpdateGroupBatchReq;
+
+int32_t tSerializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
+int32_t tDeserializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
+void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq* pReq);
typedef struct {
char queryStrId[TSDB_QUERY_ID_LEN];
diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h
index 12f92b1242..2d07b56e4c 100644
--- a/include/common/tmsgdef.h
+++ b/include/common/tmsgdef.h
@@ -389,7 +389,8 @@
TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8
TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL)
- TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL)
+ TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
+ TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG)
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index dafdac9649..9ae75bade2 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -327,7 +327,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) //
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A)
#define TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x039B)
-#define TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x039C)
// mnode-node
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
@@ -612,6 +611,16 @@ int32_t* taosGetErrno();
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
+#define TSDB_CODE_GRANT_BASIC_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0824)
+#define TSDB_CODE_GRANT_STREAM_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0825)
+#define TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0826)
+#define TSDB_CODE_GRANT_VIEW_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0827)
+#define TSDB_CODE_GRANT_AUDIT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0828)
+#define TSDB_CODE_GRANT_CSV_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0829)
+#define TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082A)
+#define TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082B)
+#define TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082C)
+#define TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082D)
// sync
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x
diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c
index 036cf0420a..439103e5c4 100644
--- a/source/client/src/clientEnv.c
+++ b/source/client/src/clientEnv.c
@@ -725,7 +725,7 @@ void taos_init_imp(void) {
return;
}
- if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1, true) != 0) {
+ if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
tscInitRes = -1;
return;
}
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 7d30a19140..1b6cb8fd22 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -92,6 +92,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
}
if (!pRsp) {
releaseTscObj(pReq->connKey.tscRid);
+ taosHashCancelIterate(hbMgr->activeInfo, pReq);
break;
}
}
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 1c0b696aaa..ac4811fb1b 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -2435,18 +2435,13 @@ void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId
// the total length is fixed to be 34 bytes.
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
-bool alreadyAddGroupId(char* ctbName) {
- size_t len = strlen(ctbName);
- if (len == 0) return false;
- size_t _location = len - 1;
- while (_location > 0) {
- if (ctbName[_location] < '0' || ctbName[_location] > '9') {
- break;
- }
- _location--;
- }
-
- return ctbName[_location] == '_' && len - 1 - _location >= 15; // 15 means the min length of groupid
+bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
+ char tmp[64] = {0};
+ snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
+ size_t len1 = strlen(ctbName);
+ size_t len2 = strlen(tmp);
+ if (len1 < len2) return false;
+ return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
}
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index 650a639964..691eccd174 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -624,7 +624,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
- tsNumOfSupportVnodes = tsNumOfCores * 2;
+ tsNumOfSupportVnodes = tsNumOfCores * 2 + 5;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
@@ -878,7 +878,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "supportVnodes");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
- tsNumOfSupportVnodes = numOfCores * 2;
+ tsNumOfSupportVnodes = numOfCores * 2 + 5;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
pItem->i32 = tsNumOfSupportVnodes;
pItem->stype = stype;
@@ -1377,6 +1377,35 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
return 0;
}
+int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd,
+ const char *envFile, char *apolloUrl, SArray *pArgs) {
+ if (tsCfg == NULL) osDefaultInit();
+
+ SConfig *pCfg = cfgInit();
+ if (pCfg == NULL) return -1;
+
+ if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
+ if (cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
+
+ if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
+ printf("failed to load cfg since %s", terrstr());
+ cfgCleanup(pCfg);
+ return -1;
+ }
+
+ if (cfgLoadFromArray(pCfg, pArgs) != 0) {
+ printf("failed to load cfg from array since %s", terrstr());
+ cfgCleanup(pCfg);
+ return -1;
+ }
+
+ tstrncpy(tsDataDir, cfgGetItem(pCfg, "dataDir")->str, PATH_MAX);
+ dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32;
+
+ cfgCleanup(pCfg);
+ return 0;
+}
+
static int32_t taosCheckGlobalCfg() {
uint32_t ipv4 = taosGetIpv4FromFqdn(tsLocalFqdn);
if (ipv4 == 0xffffffff) {
@@ -1394,7 +1423,7 @@ static int32_t taosCheckGlobalCfg() {
}
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
- bool tsc, bool isDumpCfg) {
+ bool tsc) {
if (tsCfg != NULL) return 0;
tsCfg = cfgInit();
@@ -1441,7 +1470,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
taosSetAllDebugFlag(tsCfg, cfgGetItem(tsCfg, "debugFlag")->i32);
- if(isDumpCfg) cfgDumpCfg(tsCfg, tsc, false);
+ cfgDumpCfg(tsCfg, tsc, false);
if (taosCheckGlobalCfg() != 0) {
return -1;
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index b7d1417451..ef37a41fcf 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -6437,21 +6437,28 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp *pRsp) {
taosMemoryFreeClear(pRsp->memberToken);
}
-int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) {
+int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
- if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
- if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
- for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
- if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1;
- if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1;
+
+ int32_t sz = taosArrayGetSize(pReq->updateArray);
+ if (tEncodeI32(&encoder, sz) < 0) return -1;
+
+ for (int32_t i = 0; i < sz; i++) {
+ SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
+ if (tEncodeI32(&encoder, pGroup->vgId) < 0) return -1;
+ if (tEncodeI64(&encoder, pGroup->dbUid) < 0) return -1;
+ for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
+ if (tEncodeI32(&encoder, pGroup->members[i].dnodeId) < 0) return -1;
+ if (tEncodeCStr(&encoder, pGroup->members[i].token) < 0) return -1;
+ }
+ if (tEncodeI8(&encoder, pGroup->isSync) < 0) return -1;
+ if (tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId) < 0) return -1;
+ if (tEncodeCStr(&encoder, pGroup->assignedLeader.token) < 0) return -1;
+ if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
}
- if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1;
- if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1;
- if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1;
- if (tEncodeI64(&encoder, pReq->version) < 0) return -1;
tEndEncode(&encoder);
@@ -6460,23 +6467,34 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou
return tlen;
}
-int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) {
+int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
- if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
- if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
- for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
- if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1;
- pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
- if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1;
+ int32_t sz = 0;
+ if (tDecodeI32(&decoder, &sz) < 0) return -1;
+
+ SArray *updateArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
+ if (!updateArray) return -1;
+
+ for (int32_t i = 0; i < sz; i++) {
+ SMArbUpdateGroup group = {0};
+ if (tDecodeI32(&decoder, &group.vgId) < 0) return -1;
+ if (tDecodeI64(&decoder, &group.dbUid) < 0) return -1;
+ for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
+ if (tDecodeI32(&decoder, &group.members[i].dnodeId) < 0) return -1;
+ group.members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
+ if (tDecodeCStrTo(&decoder, group.members[i].token) < 0) return -1;
+ }
+ if (tDecodeI8(&decoder, &group.isSync) < 0) return -1;
+ if (tDecodeI32(&decoder, &group.assignedLeader.dnodeId) < 0) return -1;
+ group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
+ if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
+ if (tDecodeI64(&decoder, &group.version) < 0) return -1;
+ taosArrayPush(updateArray, &group);
}
- if (tDecodeI8(&decoder, &pReq->isSync) < 0) return -1;
- if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1;
- pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
- if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1;
- if (tDecodeI64(&decoder, &pReq->version) < 0) return -1;
+ pReq->updateArray = updateArray;
tEndDecode(&decoder);
@@ -6484,14 +6502,20 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr
return 0;
}
-void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq *pReq) {
- if (NULL == pReq) {
+void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq *pReq) {
+ if (NULL == pReq || NULL == pReq->updateArray) {
return;
}
- for (int i = 0; i < 2; i++) {
- taosMemoryFreeClear(pReq->members[i].token);
+
+ int32_t sz = taosArrayGetSize(pReq->updateArray);
+ for (int32_t i = 0; i < sz; i++) {
+ SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
+ for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
+ taosMemoryFreeClear(pGroup->members[i].token);
+ }
+ taosMemoryFreeClear(pGroup->assignedLeader.token);
}
- taosMemoryFreeClear(pReq->assignedLeader.token);
+ taosArrayDestroy(pReq->updateArray);
}
// int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp
index 8e0e50165f..d30b26e564 100644
--- a/source/common/test/commonTests.cpp
+++ b/source/common/test/commonTests.cpp
@@ -692,4 +692,38 @@ TEST(timeTest, epSet) {
ASSERT_EQ(ep.numOfEps, 1);
}
}
+
+// Define test cases
+TEST(AlreadyAddGroupIdTest, GroupIdAdded) {
+ // Test case 1: Group ID has been added
+ char ctbName[64] = "abc123";
+ int64_t groupId = 123;
+ bool result = alreadyAddGroupId(ctbName, groupId);
+ EXPECT_TRUE(result);
+}
+
+TEST(AlreadyAddGroupIdTest, GroupIdNotAdded) {
+ // Test case 2: Group ID has not been added
+ char ctbName[64] = "abc456";
+ int64_t groupId = 123;
+ bool result = alreadyAddGroupId(ctbName, groupId);
+ EXPECT_FALSE(result);
+}
+
+TEST(AlreadyAddGroupIdTest, GroupIdAddedAtTheEnd) {
+ // Test case 3: Group ID has been added at the end
+ char ctbName[64] = "xyz1";
+ int64_t groupId = 1;
+ bool result = alreadyAddGroupId(ctbName, groupId);
+ EXPECT_TRUE(result);
+}
+
+TEST(AlreadyAddGroupIdTest, GroupIdAddedWithDifferentLength) {
+ // Test case 4: Group ID has been added with different length
+ char ctbName[64] = "def";
+ int64_t groupId = 123456;
+ bool result = alreadyAddGroupId(ctbName, groupId);
+ EXPECT_FALSE(result);
+}
+
#pragma GCC diagnostic pop
diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c
index f7fcd7697f..76b0565402 100644
--- a/source/dnode/mgmt/exe/dmMain.c
+++ b/source/dnode/mgmt/exe/dmMain.c
@@ -205,11 +205,11 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
if(i < argc - 1) {
int32_t len = strlen(argv[++i]);
if (len < ENCRYPT_KEY_LEN_MIN) {
- printf("Error: Encrypt key should be at least %d characters\n", ENCRYPT_KEY_LEN_MIN);
+ printf("ERROR: Encrypt key should be at least %d characters\n", ENCRYPT_KEY_LEN_MIN);
return -1;
}
if (len > ENCRYPT_KEY_LEN) {
- printf("Error: Encrypt key overflow, it should be at most %d characters\n", ENCRYPT_KEY_LEN);
+ printf("ERROR: Encrypt key overflow, it should be at most %d characters\n", ENCRYPT_KEY_LEN);
return -1;
}
tstrncpy(global.encryptKey, argv[i], ENCRYPT_KEY_LEN);
@@ -371,6 +371,22 @@ int mainWindows(int argc, char **argv) {
printf("memory dbg enabled\n");
}
#endif
+ if(global.generateCode) {
+ bool toLogFile = false;
+ if(taosReadDataFolder(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs) != 0){
+ encryptError("failed to generate encrypt code since taosd is running, please stop it first");
+ return -1;
+ };
+
+ if(dmCheckRunning(tsDataDir) == NULL) {
+ encryptError("failed to generate encrypt code since taosd is running, please stop it first");
+ return -1;
+ }
+ int ret = dmUpdateEncryptKey(global.encryptKey, toLogFile);
+ taosCloseLog();
+ taosCleanupArgs();
+ return ret;
+ }
if (dmInitLog() != 0) {
printf("failed to start since init log error\n");
@@ -380,28 +396,13 @@ int mainWindows(int argc, char **argv) {
dmPrintArgs(argc, argv);
- bool isDumpCfg = true;
- if(global.generateCode) {
- isDumpCfg = false;
- }
- if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0, isDumpCfg) != 0) {
+ if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) {
dError("failed to start since read config error");
taosCloseLog();
taosCleanupArgs();
return -1;
}
- if(global.generateCode) {
- if(dmCheckRunning(tsDataDir) == NULL) {
- dError("failed to generate encrypt code since taosd is running, please stop it first");
- return -1;
- }
- int ret = dmUpdateEncryptKey(global.encryptKey);
- taosCloseLog();
- taosCleanupArgs();
- return ret;
- }
-
if(dmGetEncryptKey() != 0){
dError("failed to start since failed to get encrypt key");
taosCloseLog();
diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
index 3ec080fb21..800f7f7864 100644
--- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
@@ -220,7 +220,7 @@ int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _exit;
}
- code = dmUpdateEncryptKey(cfgReq.value);
+ code = dmUpdateEncryptKey(cfgReq.value, true);
if (code == 0) {
tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;
diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h
index 9fdd5e50ed..aea3286d76 100644
--- a/source/dnode/mgmt/node_util/inc/dmUtil.h
+++ b/source/dnode/mgmt/node_util/inc/dmUtil.h
@@ -53,6 +53,36 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
+#define encryptDebug(...) { \
+ if (toLogFile) { \
+ if (dDebugFlag & DEBUG_DEBUG) {taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__);} \
+ } else { \
+ /*if (dDebugFlag & DEBUG_DEBUG) {taosPrintLog("DND ", DEBUG_SCREEN, dDebugFlag, __VA_ARGS__);}*/ \
+ if (dDebugFlag & DEBUG_DEBUG) {printf(__VA_ARGS__); printf("\n");} \
+ } \
+}
+
+#define encryptInfo(...) { \
+ if (toLogFile) { \
+ taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); \
+ } else { \
+ /*if (dDebugFlag & DEBUG_DEBUG) {taosPrintLog("DND ", DEBUG_SCREEN, dDebugFlag, __VA_ARGS__);}*/ \
+ printf(__VA_ARGS__); \
+ printf("\n"); \
+ } \
+}
+
+#define encryptError(...) { \
+ if (toLogFile) { \
+ taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
+ }\
+ else{ \
+ /*taosPrintLog("DND ", DEBUG_SCREEN, 255, __VA_ARGS__); */\
+ printf("ERROR: " __VA_ARGS__); \
+ printf("\n"); \
+ }\
+}
+
#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}}
#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}}
#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}}
@@ -194,7 +224,7 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
bool dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port);
void dmRemoveDnodePairs(SDnodeData *pData);
void dmGetDnodeEp(void *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
-int32_t dmUpdateEncryptKey(char *key);
+int32_t dmUpdateEncryptKey(char *key, bool toLogFile);
int32_t dmGetEncryptKey();
#ifdef __cplusplus
}
diff --git a/source/dnode/mgmt/node_util/src/dmFile.c b/source/dnode/mgmt/node_util/src/dmFile.c
index e2c52cfb7e..752abb83a2 100644
--- a/source/dnode/mgmt/node_util/src/dmFile.c
+++ b/source/dnode/mgmt/node_util/src/dmFile.c
@@ -186,7 +186,7 @@ TdFilePtr dmCheckRunning(const char *dataDir) {
extern int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode);
-static int32_t dmWriteCheckCodeFile(char* file, char* realfile, char* key){
+static int32_t dmWriteCheckCodeFile(char* file, char* realfile, char* key, bool toLogFile){
TdFilePtr pFile = NULL;
char *result = NULL;
int32_t code = -1;
@@ -211,7 +211,8 @@ static int32_t dmWriteCheckCodeFile(char* file, char* realfile, char* key){
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
- dInfo("succeed to write checkCode file:%s", realfile);
+ encryptDebug("succeed to write checkCode file:%s", realfile);
+
code = 0;
_OVER:
if(pFile != NULL) taosCloseFile(&pFile);
@@ -220,7 +221,7 @@ _OVER:
return code;
}
-static int32_t dmWriteEncryptCodeFile(char* file, char* realfile, char* encryptCode){
+static int32_t dmWriteEncryptCodeFile(char* file, char* realfile, char* encryptCode, bool toLogFile){
TdFilePtr pFile = NULL;
int32_t code = -1;
@@ -234,7 +235,7 @@ static int32_t dmWriteEncryptCodeFile(char* file, char* realfile, char* encryptC
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
- dInfo("succeed to write encryptCode file:%s", realfile);
+ encryptDebug("succeed to write encryptCode file:%s", realfile);
code = 0;
_OVER:
@@ -243,7 +244,7 @@ _OVER:
return code;
}
-static int32_t dmCompareEncryptKey(char* file, char* key){
+static int32_t dmCompareEncryptKey(char* file, char* key, bool toLogFile){
char *content = NULL;
int64_t size = 0;
TdFilePtr pFile = NULL;
@@ -253,13 +254,13 @@ static int32_t dmCompareEncryptKey(char* file, char* key){
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
- dError("failed to open dnode file:%s since %s", file, terrstr());
+ encryptError("failed to open dnode file:%s since %s", file, terrstr());
goto _OVER;
}
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
- dError("failed to fstat dnode file:%s since %s", file, terrstr());
+ encryptError("failed to fstat dnode file:%s since %s", file, terrstr());
goto _OVER;
}
@@ -271,11 +272,11 @@ static int32_t dmCompareEncryptKey(char* file, char* key){
if (taosReadFile(pFile, content, size) != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
- dError("failed to read dnode file:%s since %s", file, terrstr());
+ encryptError("failed to read dnode file:%s since %s", file, terrstr());
goto _OVER;
}
- dInfo("succeed to read checkCode file:%s", file);
+ encryptDebug("succeed to read checkCode file:%s", file);
int len = ENCRYPTED_LEN(size);
result = taosMemoryMalloc(len);
@@ -290,11 +291,11 @@ static int32_t dmCompareEncryptKey(char* file, char* key){
if(strcmp(opts.result, DM_KEY_INDICATOR) != 0) {
terrno = TSDB_CODE_DNODE_ENCRYPTKEY_CHANGED;
- dError("failed to compare decrypted result");
+ encryptError("failed to compare decrypted result");
goto _OVER;
}
- dInfo("succeed to compare checkCode file:%s", file);
+ encryptDebug("succeed to compare checkCode file:%s", file);
code = 0;
_OVER:
if(result != NULL) taosMemoryFree(result);
@@ -304,7 +305,7 @@ _OVER:
return code;
}
-int32_t dmUpdateEncryptKey(char *key) {
+int32_t dmUpdateEncryptKey(char *key, bool toLogFile) {
#ifdef TD_ENTERPRISE
int32_t code = -1;
char *machineId = NULL;
@@ -328,12 +329,12 @@ int32_t dmUpdateEncryptKey(char *key) {
if (taosMkDir(folder) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
- dError("failed to create dir:%s since %s", folder, terrstr());
+ encryptError("failed to create dir:%s since %s", folder, terrstr());
goto _OVER;
}
if(taosCheckExistFile(realCheckFile)){
- if(dmCompareEncryptKey(realCheckFile, key) != 0){
+ if(dmCompareEncryptKey(realCheckFile, key, toLogFile) != 0){
goto _OVER;
}
}
@@ -347,21 +348,23 @@ int32_t dmUpdateEncryptKey(char *key) {
goto _OVER;
}
- if(dmWriteEncryptCodeFile(encryptFile, realEncryptFile, encryptCode) != 0){
+ if(dmWriteEncryptCodeFile(encryptFile, realEncryptFile, encryptCode, toLogFile) != 0){
goto _OVER;
}
- if(dmWriteCheckCodeFile(checkFile, realCheckFile, key) != 0){
+ if(dmWriteCheckCodeFile(checkFile, realCheckFile, key, toLogFile) != 0){
goto _OVER;
}
+ encryptInfo("Succeed to update encrypt key\n");
+
code = 0;
_OVER:
taosMemoryFree(encryptCode);
taosMemoryFree(machineId);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
- dError("failed to update encrypt key since %s", terrstr());
+ encryptError("failed to update encrypt key since %s", terrstr());
}
return code;
#else
@@ -453,7 +456,7 @@ int32_t dmGetEncryptKey(){
goto _OVER;
}
- if(dmCompareEncryptKey(checkFile, encryptKey) != 0){
+ if(dmCompareEncryptKey(checkFile, encryptKey, true) != 0){
goto _OVER;
}
diff --git a/source/dnode/mgmt/test/sut/src/server.cpp b/source/dnode/mgmt/test/sut/src/server.cpp
index 81e6dcf495..541c5a42f4 100644
--- a/source/dnode/mgmt/test/sut/src/server.cpp
+++ b/source/dnode/mgmt/test/sut/src/server.cpp
@@ -38,7 +38,7 @@ bool TestServer::Start() {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
taosThreadCreate(&threadId, &thAttr, serverLoop, this);
taosThreadAttrDestroy(&thAttr);
- taosMsleep(2100);
+ taosMsleep(10000);
return runnning;
}
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index 68c55e235f..0c40622d08 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -180,7 +180,7 @@ typedef struct {
tmsg_t originRpcType;
char dbname[TSDB_TABLE_FNAME_LEN];
char stbname[TSDB_TABLE_FNAME_LEN];
- int32_t arbGroupId;
+ SHashObj* arbGroupIds;
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h
index 8c9ca87fb1..8008eb76e7 100644
--- a/source/dnode/mnode/impl/inc/mndTrans.h
+++ b/source/dnode/mnode/impl/inc/mndTrans.h
@@ -78,7 +78,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
-void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId);
+void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId);
void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans);
void mndTransSetChangeless(STrans *pTrans);
diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c
index d0a86bdde7..50338fe889 100644
--- a/source/dnode/mnode/impl/src/mndArbGroup.c
+++ b/source/dnode/mnode/impl/src/mndArbGroup.c
@@ -39,10 +39,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
+static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
-static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq);
+static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
@@ -68,7 +69,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
- mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP, mndProcessArbUpdateGroupReq);
+ mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
@@ -81,9 +82,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
-void mndCleanupArbGroup(SMnode *pMnode) {
- taosHashCleanup(arbUpdateHash);
-}
+void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
@@ -541,6 +540,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return -1;
}
+ SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
+
while (1) {
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break;
@@ -612,40 +613,27 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
SArbGroup newGroup = {0};
mndArbGroupDupObj(&arbGroupDup, &newGroup);
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
- if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
- mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId,
- terrstr());
- sdbRelease(pSdb, pArbGroup);
- return -1;
- }
-
- mInfo("vgId:%d, arb pull up set assigned leader to dnodeId:%d", vgId, pMember->info.dnodeId);
+ taosArrayPush(pUpdateArray, &newGroup);
sdbRelease(pSdb, pArbGroup);
}
+ (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
+
+ taosArrayDestroy(pUpdateArray);
return 0;
}
-static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) {
- SMArbUpdateGroupReq req = {0};
- req.vgId = pNewGroup->vgId;
- req.dbUid = pNewGroup->dbUid;
- for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
- req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId;
- req.members[i].token = pNewGroup->members[i].state.token;
- }
- req.isSync = pNewGroup->isSync;
- req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId;
- req.assignedLeader.token = pNewGroup->assignedLeader.token;
- req.version = pNewGroup->version;
+static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
+ SMArbUpdateGroupBatchReq req = {0};
+ req.updateArray = updateArray;
- int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req);
+ int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
if (contLen <= 0) return NULL;
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) return NULL;
- if (tSerializeSMArbUpdateGroupReq(pHead, contLen, &req) <= 0) {
+ if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
rpcFreeCont(pHead);
return NULL;
}
@@ -653,60 +641,172 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
return pHead;
}
+static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
+ outGroup->vgId = pGroup->vgId;
+ outGroup->dbUid = pGroup->dbUid;
+ for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
+ outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
+ outGroup->members[i].token = pGroup->members[i].state.token; // just copy the pointer
+ }
+ outGroup->isSync = pGroup->isSync;
+ outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
+ outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
+ outGroup->version = pGroup->version;
+}
+
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
return 0;
}
- int32_t contLen = 0;
- void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
- if (!pHead) {
- mError("vgId:%d, failed to build arb-update-group request", pNewGroup->vgId);
- return -1;
- }
- SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
+ int32_t ret = -1;
- int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
- if (ret == 0) {
- taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
+ SMArbUpdateGroup newGroup = {0};
+ mndInitArbUpdateGroup(pNewGroup, &newGroup);
+
+ SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
+ taosArrayPush(pArray, &newGroup);
+
+ int32_t contLen = 0;
+ void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
+ if (!pHead) {
+ mError("failed to build arb-update-group request");
+ goto _OVER;
}
+
+ SRpcMsg rpcMsg = {
+ .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
+ ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
+ if (ret != 0) goto _OVER;
+
+ taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
+
+_OVER:
+ taosArrayDestroy(pArray);
return ret;
}
-static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
- int ret = 0;
+static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
+ int32_t ret = -1;
- SMArbUpdateGroupReq req = {0};
- tDeserializeSMArbUpdateGroupReq(pReq->pCont, pReq->contLen, &req);
+ size_t sz = taosArrayGetSize(newGroupArray);
+ SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
+ for (size_t i = 0; i < sz; i++) {
+ SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
+ if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
+ mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
+ continue;
+ }
- SArbGroup newGroup = {0};
- newGroup.vgId = req.vgId;
- newGroup.dbUid = req.dbUid;
- for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
- newGroup.members[i].info.dnodeId = req.members[i].dnodeId;
- memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE);
+ SMArbUpdateGroup newGroup = {0};
+ mndInitArbUpdateGroup(pNewGroup, &newGroup);
+
+ taosArrayPush(pArray, &newGroup);
+ taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
}
- newGroup.isSync = req.isSync;
- newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId;
- memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
- newGroup.version = req.version;
-
- SMnode *pMnode = pReq->info.node;
- SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
- if (!pOldGroup) {
- mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
- return 0;
- }
- sdbRelease(pMnode->pSdb, pOldGroup);
-
- if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
- mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr());
- ret = -1;
+ if (taosArrayGetSize(pArray) == 0) {
+ ret = 0;
+ goto _OVER;
}
- tFreeSMArbUpdateGroupReq(&req);
+ int32_t contLen = 0;
+ void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
+ if (!pHead) {
+ mError("failed to build arb-update-group request");
+ goto _OVER;
+ }
+
+ SRpcMsg rpcMsg = {
+ .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
+ ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
+
+_OVER:
+ taosArrayDestroy(pArray);
+
+ if (ret != 0) {
+ for (size_t i = 0; i < sz; i++) {
+ SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
+ taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId));
+ }
+ }
+
+ return ret;
+}
+
+static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
+ int ret = -1;
+ size_t sz = 0;
+
+ SMArbUpdateGroupBatchReq req = {0};
+ if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
+ mError("arb failed to decode arb-update-group request");
+ return -1;
+ }
+
+ SMnode *pMnode = pReq->info.node;
+ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
+ if (pTrans == NULL) {
+ mError("failed to update arbgroup in create trans, since %s", terrstr());
+ goto _OVER;
+ }
+
+ sz = taosArrayGetSize(req.updateArray);
+ for (size_t i = 0; i < sz; i++) {
+ SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
+ SArbGroup newGroup = {0};
+ newGroup.vgId = pUpdateGroup->vgId;
+ newGroup.dbUid = pUpdateGroup->dbUid;
+ for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
+ newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
+ memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
+ }
+
+ newGroup.isSync = pUpdateGroup->isSync;
+ newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
+ memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
+ newGroup.version = pUpdateGroup->version;
+
+ SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
+ if (!pOldGroup) {
+ mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
+ taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t));
+ continue;
+ }
+
+ mndTransAddArbGroupId(pTrans, newGroup.vgId);
+
+ if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
+ mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
+ terrstr());
+ goto _OVER;
+ }
+
+ mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
+ pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
+ newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
+ newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token);
+
+ sdbRelease(pMnode->pSdb, pOldGroup);
+ }
+
+ if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
+ if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
+
+ ret = 0;
+
+_OVER:
+ if (ret != 0) {
+ // failed to update arbgroup
+ for (size_t i = 0; i < sz; i++) {
+ SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
+ taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t));
+ }
+ }
+
+ mndTransDrop(pTrans);
+ tFreeSMArbUpdateGroupBatchReq(&req);
return ret;
}
@@ -739,7 +839,7 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
pNew->assignedLeader.token);
- mndTransSetArbGroupId(pTrans, pNew->vgId);
+ mndTransAddArbGroupId(pTrans, pNew->vgId);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
ret = -1;
goto _OVER;
@@ -816,10 +916,10 @@ _OVER:
}
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
- int ret = 0;
int64_t nowMs = taosGetTimestampMs();
+ size_t size = taosArrayGetSize(memberArray);
+ SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
- size_t size = taosArrayGetSize(memberArray);
for (size_t i = 0; i < size; i++) {
SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
@@ -832,17 +932,16 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
if (updateToken) {
- ret = mndPullupArbUpdateGroup(pMnode, &newGroup);
- if (ret != 0) {
- mInfo("failed to pullup update arb token, vgId:%d, since %s", pRspMember->vgId, terrstr());
- }
+ taosArrayPush(pUpdateArray, &newGroup);
}
sdbRelease(pMnode->pSdb, pGroup);
- if (ret != 0) break;
}
- return ret;
+ (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
+
+ taosArrayDestroy(pUpdateArray);
+ return 0;
}
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
@@ -900,6 +999,11 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
}
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
+ if (pRsp->contLen == 0) {
+ mDebug("arb hb-rsp contLen is 0");
+ return 0;
+ }
+
int32_t ret = -1;
SMnode *pMnode = pRsp->info.node;
@@ -914,6 +1018,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
SVArbHeartBeatRsp arbHbRsp = {0};
if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
+ mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
@@ -934,6 +1039,11 @@ _OVER:
}
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
+ if (pRsp->contLen == 0) {
+ mDebug("arb check-sync-rsp contLen is 0");
+ return 0;
+ }
+
int32_t ret = -1;
SMnode *pMnode = pRsp->info.node;
@@ -948,7 +1058,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
SVArbCheckSyncRsp syncRsp = {0};
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
- mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
+ mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
terrno = TSDB_CODE_SUCCESS;
return 0;
@@ -1008,6 +1118,11 @@ _OVER:
}
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
+ if (pRsp->contLen == 0) {
+ mDebug("arb set-assigned-rsp contLen is 0");
+ return 0;
+ }
+
int32_t ret = -1;
SMnode *pMnode = pRsp->info.node;
@@ -1022,8 +1137,8 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
+ mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
terrno = TSDB_CODE_INVALID_MSG;
- mInfo("arb set assigned failed, des failed since:%s", tstrerror(pRsp->code));
return -1;
}
diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c
index f254fb16a5..ad596289de 100644
--- a/source/dnode/mnode/impl/src/mndDb.c
+++ b/source/dnode/mnode/impl/src/mndDb.c
@@ -821,8 +821,7 @@ static int32_t mndCheckDbEncryptKey(SMnode *pMnode, SCreateDbReq *pReq) {
#ifdef TD_ENTERPRISE
if (pReq->encryptAlgorithm == TSDB_ENCRYPT_ALGO_NONE) goto _exit;
- if (grantCheck(TSDB_GRANT_DB_ENCRYPTION) != 0) {
- code = TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED;
+ if ((code = grantCheck(TSDB_GRANT_DB_ENCRYPTION)) != 0) {
goto _exit;
}
if (tsEncryptionKeyStat != ENCRYPT_KEY_STAT_LOADED) {
@@ -1226,7 +1225,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (terrno != 0) code = terrno;
- mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
+ mError("db:%s, failed to alter since %s", alterReq.db, tstrerror(code));
}
mndReleaseDb(pMnode, pDb);
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index cb0a6c5d99..989d9970cd 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -1914,8 +1914,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
- if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
- terrno = TSDB_CODE_GRANT_EXPIRED;
+ if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return -1;
}
diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c
index a81a391c3d..9bd7b3b18f 100644
--- a/source/dnode/mnode/impl/src/mndStreamHb.c
+++ b/source/dnode/mnode/impl/src/mndStreamHb.c
@@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedTasks = NULL;
SArray *pOrphanTasks = NULL;
- if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
+ if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
return -1;
}
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 84940e01d4..8b01d296a3 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -26,7 +26,7 @@
#define TRANS_VER1_NUMBER 1
#define TRANS_VER2_NUMBER 2
#define TRANS_ARRAY_SIZE 8
-#define TRANS_RESERVE_SIZE 48
+#define TRANS_RESERVE_SIZE 44
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
@@ -196,10 +196,21 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
}
SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER)
+
+ int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds);
+ SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER)
+ void *pIter = NULL;
+ pIter = taosHashIterate(pTrans->arbGroupIds, NULL);
+ while (pIter) {
+ int32_t arbGroupId = *(int32_t *)pIter;
+ SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER)
+ pIter = taosHashIterate(pTrans->arbGroupIds, pIter);
+ }
+
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
- terrno = 0;
+ terrno = 0;
_OVER:
if (terrno != 0) {
@@ -279,6 +290,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
int32_t dataPos = 0;
+ int32_t arbgroupIdNum = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
@@ -350,6 +362,16 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
}
SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER);
+
+ pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
+
+ SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER)
+ for (int32_t i = 0; i < arbgroupIdNum; ++i) {
+ int32_t arbGroupId = 0;
+ SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER)
+ taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0);
+ }
+
SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
terrno = 0;
@@ -462,6 +484,9 @@ void mndTransDropData(STrans *pTrans) {
mndTransDropActions(pTrans->commitActions);
pTrans->commitActions = NULL;
}
+ if (pTrans->arbGroupIds != NULL) {
+ taosHashCleanup(pTrans->arbGroupIds);
+ }
if (pTrans->pRpcArray != NULL) {
taosArrayDestroy(pTrans->pRpcArray);
pTrans->pRpcArray = NULL;
@@ -581,6 +606,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
+ pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
taosInitRWLatch(&pTrans->lockRpcArray);
@@ -733,7 +759,9 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
}
}
-void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId) { pTrans->arbGroupId = groupId; }
+void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) {
+ taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0);
+}
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
@@ -821,7 +849,16 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
- if (pNew->arbGroupId == pTrans->arbGroupId) conflict = true;
+ void *pIter = taosHashIterate(pNew->arbGroupIds, NULL);
+ while (pIter != NULL) {
+ int32_t groupId = *(int32_t *)pIter;
+ if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
+ taosHashCancelIterate(pNew->arbGroupIds, pIter);
+ conflict = true;
+ break;
+ }
+ pIter = taosHashIterate(pNew->arbGroupIds, pIter);
+ }
}
}
@@ -1372,7 +1409,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos);
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
- STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos);
+ STransAction *pAction = taosArrayGet(pActions, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code == 0) {
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index ce6db55cef..5929c6b591 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -71,7 +71,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
- if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) {
+ if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) {
buildCtbNameAddGroupId(stbFullName, name, groupId);
}
} else if (stbFullName) {
@@ -182,7 +182,7 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) {
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
- !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) {
+ !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid);
@@ -713,7 +713,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
} else {
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
- !alreadyAddGroupId(dstTableName) && groupId != 0) {
+ !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index f602ee5fe3..d0c753d280 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -1238,6 +1238,18 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
// 1. prepare last
+ STsdbRowKey tsdbRowKey = {0};
+ tsdbRowGetKey(&lRow, &tsdbRowKey);
+
+ {
+ SLastUpdateCtx updateCtx = {
+ .lflag = LFLAG_LAST,
+ .tsdbRowKey = tsdbRowKey,
+ .colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
+ .val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
+ taosArrayPush(ctxArray, &updateCtx);
+ }
+
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
@@ -1263,9 +1275,6 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
}
// 2. prepare last row
- STsdbRowKey tsdbRowKey = {0};
- tsdbRowGetKey(&lRow, &tsdbRowKey);
-
STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, &lRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {
diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c
index 2cba6e3241..29907e6f1f 100644
--- a/source/libs/executor/src/eventwindowoperator.c
+++ b/source/libs/executor/src/eventwindowoperator.c
@@ -202,6 +202,7 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
}
eventWindowAggImpl(pOperator, pInfo, pBlock);
+ doFilter(pRes, pSup->pFilterInfo, NULL);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
}
diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c
index e2ea6c1374..2a5e594ba3 100644
--- a/source/libs/function/src/udfd.c
+++ b/source/libs/function/src/udfd.c
@@ -1424,7 +1424,7 @@ int main(int argc, char *argv[]) {
printf("failed to start since init log error\n");
}
- if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0, true) != 0) {
+ if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -2;
}
diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c
index eef6cdc3d3..aa8b88b738 100644
--- a/source/libs/function/test/runUdf.c
+++ b/source/libs/function/test/runUdf.c
@@ -127,7 +127,7 @@ int aggregateFuncTest() {
int main(int argc, char *argv[]) {
parseArgs(argc, argv);
initLog();
- if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0, true) != 0) {
+ if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
}
diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c
index 9ae9dfffd7..5399aa7f09 100644
--- a/source/libs/parser/src/parAstCreater.c
+++ b/source/libs/parser/src/parAstCreater.c
@@ -167,7 +167,7 @@ static bool checkDbName(SAstCreateContext* pCxt, SToken* pDbName, bool demandDb)
}
} else {
trimEscape(pDbName);
- if (pDbName->n >= TSDB_DB_NAME_LEN) {
+ if (pDbName->n >= TSDB_DB_NAME_LEN || pDbName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pDbName->z);
}
}
@@ -176,7 +176,7 @@ static bool checkDbName(SAstCreateContext* pCxt, SToken* pDbName, bool demandDb)
static bool checkTableName(SAstCreateContext* pCxt, SToken* pTableName) {
trimEscape(pTableName);
- if (NULL != pTableName && pTableName->n >= TSDB_TABLE_NAME_LEN) {
+ if (NULL != pTableName && pTableName->type != TK_NK_NIL && (pTableName->n >= TSDB_TABLE_NAME_LEN || pTableName->n == 0)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pTableName->z);
return false;
}
@@ -185,7 +185,7 @@ static bool checkTableName(SAstCreateContext* pCxt, SToken* pTableName) {
static bool checkColumnName(SAstCreateContext* pCxt, SToken* pColumnName) {
trimEscape(pColumnName);
- if (NULL != pColumnName && pColumnName->n >= TSDB_COL_NAME_LEN) {
+ if (NULL != pColumnName && pColumnName->type != TK_NK_NIL && (pColumnName->n >= TSDB_COL_NAME_LEN || pColumnName->n == 0)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pColumnName->z);
return false;
}
@@ -203,7 +203,7 @@ static bool checkIndexName(SAstCreateContext* pCxt, SToken* pIndexName) {
static bool checkTopicName(SAstCreateContext* pCxt, SToken* pTopicName) {
trimEscape(pTopicName);
- if (pTopicName->n >= TSDB_TOPIC_NAME_LEN) {
+ if (pTopicName->n >= TSDB_TOPIC_NAME_LEN || pTopicName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pTopicName->z);
return false;
}
@@ -221,7 +221,7 @@ static bool checkCGroupName(SAstCreateContext* pCxt, SToken* pCGroup) {
static bool checkViewName(SAstCreateContext* pCxt, SToken* pViewName) {
trimEscape(pViewName);
- if (pViewName->n >= TSDB_VIEW_NAME_LEN) {
+ if (pViewName->n >= TSDB_VIEW_NAME_LEN || pViewName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pViewName->z);
return false;
}
@@ -230,7 +230,7 @@ static bool checkViewName(SAstCreateContext* pCxt, SToken* pViewName) {
static bool checkStreamName(SAstCreateContext* pCxt, SToken* pStreamName) {
trimEscape(pStreamName);
- if (pStreamName->n >= TSDB_STREAM_NAME_LEN) {
+ if (pStreamName->n >= TSDB_STREAM_NAME_LEN || pStreamName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pStreamName->z);
return false;
}
@@ -252,6 +252,8 @@ static bool checkTsmaName(SAstCreateContext* pCxt, SToken* pTsmaToken) {
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
} else if (pTsmaToken->n >= TSDB_TABLE_NAME_LEN - strlen(TSMA_RES_STB_POSTFIX)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSMA_NAME_TOO_LONG);
+ } else if (pTsmaToken->n == 0) {
+ pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pTsmaToken->z);
}
return pCxt->errCode == TSDB_CODE_SUCCESS;
}
@@ -1935,6 +1937,7 @@ SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, SToken* pTagName, SNode* pVal) {
CHECK_PARSER_STATUS(pCxt);
if (!checkColumnName(pCxt, pTagName)) {
+ nodesDestroyNode(pVal);
return NULL;
}
SAlterTableStmt* pStmt = (SAlterTableStmt*)nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
@@ -1946,6 +1949,7 @@ SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, SToken
}
SNode* setAlterSuperTableType(SNode* pStmt) {
+ if (!pStmt) return NULL;
setNodeType(pStmt, QUERY_NODE_ALTER_SUPER_TABLE_STMT);
return pStmt;
}
@@ -2436,6 +2440,7 @@ SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, c
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) {
+ nodesDestroyNode(pQuery);
return NULL;
}
SCreateTopicStmt* pStmt = (SCreateTopicStmt*)nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
@@ -2721,6 +2726,8 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
CHECK_PARSER_STATUS(pCxt);
if (!checkStreamName(pCxt, pStreamName)) {
+ nodesDestroyNode(pQuery);
+ nodesDestroyNode(pOptions);
return NULL;
}
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);
diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c
index 370514e1fc..22f274b21c 100644
--- a/source/libs/parser/src/parInsertSql.c
+++ b/source/libs/parser/src/parInsertSql.c
@@ -2393,7 +2393,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
// no data in the sql string anymore.
if (0 == pTbName->n) {
if (0 != pTbName->type && '\0' != pStmt->pSql[0]) {
- return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", pTbName->z);
+ return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pTbName->z);
}
if (0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {
diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c
index 5ad9feddbc..7c43113ad2 100644
--- a/source/libs/parser/src/parInsertUtil.c
+++ b/source/libs/parser/src/parInsertUtil.c
@@ -103,6 +103,9 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
+ if (pTableName->n == 0) {
+ return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
+ }
char name[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(name, pTableName->z, pTableName->n);
@@ -111,6 +114,8 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
if (dbName == NULL) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
+ if (name[0] == '\0')
+ return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
if (code != TSDB_CODE_SUCCESS) {
@@ -732,6 +737,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
}
} else {
+ bool hasTs = false;
for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
@@ -743,6 +749,10 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end;
}
+ if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
+ hasTs = true;
+ }
+
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
@@ -768,6 +778,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
}
+ if(!hasTs){
+ if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
+ ret = TSDB_CODE_INVALID_PARA;
+ goto end;
+ }
+
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c
index faa90dcbf8..11e9350e14 100644
--- a/source/libs/qworker/src/qwMsg.c
+++ b/source/libs/qworker/src/qwMsg.c
@@ -370,12 +370,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) {
QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg);
- QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
+ QW_ERR_RET(TSDB_CODE_GRANT_VIEW_EXPIRED);
}
if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) {
QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg);
- QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
+ QW_ERR_RET(TSDB_CODE_GRANT_AUDIT_EXPIRED);
}
}
}
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index 159bb9d38e..28db0a76c6 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -1150,10 +1150,13 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
taskDbAddRef(pTaskDb);
int64_t chkpId = pTaskDb->chkpId;
+ taskDbRefChkp(pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, chkpId);
- taskDbRemoveRef(pTaskDb);
+ if (code != 0) {
+ taskDbUnRefChkp(pTaskDb, chkpId);
+ }
- taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
+ taskDbRemoveRef(pTaskDb);
SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
@@ -1177,14 +1180,15 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
- STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
- if (pTaskDb == NULL) {
+ STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
+ if (pTaskDb == NULL || *pTaskDb == NULL) {
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
+ memset(buf, 0, sizeof(buf));
continue;
}
memset(buf, 0, sizeof(buf));
- taskDbUnRefChkp(pTaskDb, pSnap->chkpId);
+ taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return 0;
@@ -1985,7 +1989,8 @@ void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
- for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) {
+ int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
+ for (int i = 0; i < size; i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
if (*p == chkp) {
taosArrayRemove(pTaskDb->chkpInUse, i);
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 7c80d15307..3115c2cb43 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -494,7 +494,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
if (pDataBlock->info.parTbName[0]) {
if(pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(pDataBlock->info.parTbName) &&
- !alreadyAddGroupId(pDataBlock->info.parTbName) &&
+ !alreadyAddGroupId(pDataBlock->info.parTbName, groupId) &&
groupId != 0){
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 9daca9a99c..b60164fca9 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -184,16 +184,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code;
}
-static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
+static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
int32_t code = TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
- if (code != TSDB_CODE_SUCCESS) {
- stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
+ if (code != TSDB_CODE_SUCCESS) { // should not have error code
+ stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
}
} else {
- taosArrayDestroy(pRes);
+ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
}
return code;
}
@@ -268,6 +268,17 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
+ // output queue is full, idle for 5 sec.
+ if (streamQueueIsFull(pTask->outputq.queue)) {
+ stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
+ return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
+ }
+
+ if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
+ stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
+ return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
+ }
+
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -284,19 +295,13 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
}
// dispatch the generated results
- /*int32_t code = */handleResultBlocks(pTask, pRes, size);
-
- int64_t el = taosGetTimestampMs() - st;
-
- // downstream task input queue is full, try in 5sec
- if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
- return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
- }
+ /*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size);
if (finished) {
return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
}
+ int64_t el = taosGetTimestampMs() - st;
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->info.fillHistory, el / 1000.0);
@@ -558,7 +563,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (streamQueueIsFull(pTask->outputq.queue)) {
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
- streamTaskSetIdleInfo(pTask, 500);
+ streamTaskSetIdleInfo(pTask, 1000);
return 0;
}
diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c
index 5596eb3dee..247baea16f 100644
--- a/source/libs/stream/src/streamQueue.c
+++ b/source/libs/stream/src/streamQueue.c
@@ -381,7 +381,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
}
}
- return TSDB_CODE_SUCCESS;
+ return code;
}
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c
index eb67e61c4c..25015c4d33 100644
--- a/source/libs/stream/src/streamSnapshot.c
+++ b/source/libs/stream/src/streamSnapshot.c
@@ -144,7 +144,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
-int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
+int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
@@ -333,7 +333,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(handle->pDbSnapSet);
}
- streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
+ streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);
diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c
index 83de642e51..fb745f86cb 100644
--- a/source/libs/stream/src/tstreamFileState.c
+++ b/source/libs/stream/src/tstreamFileState.c
@@ -759,7 +759,7 @@ void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStat
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
- return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
+ return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c
index 5a3dc867e6..4635ec340d 100644
--- a/source/util/src/tcompression.c
+++ b/source/util/src/tcompression.c
@@ -2741,7 +2741,7 @@ int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut,
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
- if (lvl != 0 && lossyFloat) {
+ if (l2 == L2_TSZ && lvl != 0 && lossyFloat) {
return tsCompressFloatLossyImp(pIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
@@ -2760,7 +2760,7 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
- if (lvl != 0 && lossyDouble) {
+ if (l2 == L2_TSZ && lvl != 0 && lossyDouble) {
// lossy mode
return tsCompressDoubleLossyImp(pIn, nEle, pOut);
}
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index ad811cc891..0f594af0e9 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -259,7 +259,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY, "Inconsistent encryption key")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ENCRYPT_KEY, "The cluster has not been set properly for database encryption")
-TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED, "The database encryption function grant expired")
// mnode-node
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")
@@ -499,9 +498,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LACK_OF_BASIC, "Lack of basic functio
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LAST_ACTIVE_NOT_FOUND, "The historial active code does not match")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mismatch with active code")
-TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expiration time of optional grant item is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
-TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of views has reached the licensed upper limit")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_BASIC_EXPIRED, "License expired for basic functions")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_EXPIRED, "License expired for stream function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED, "License expired for subscription function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_EXPIRED, "License expired for view function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_EXPIRED, "License expired for audit function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_EXPIRED, "License expired for CSV function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED, "License expired for multi-tier storage function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED, "License expired for object storage function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED,"License expired for dual-replica HA function")
+TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED, "License expired for database encryption function")
+
// sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
diff --git a/tests/army/community/insert/test_column_tag_boundary.py b/tests/army/community/insert/test_column_tag_boundary.py
index 4c04fd3f9b..4f1245d728 100644
--- a/tests/army/community/insert/test_column_tag_boundary.py
+++ b/tests/army/community/insert/test_column_tag_boundary.py
@@ -22,6 +22,7 @@ class TDTestCase(TBase):
self.child_table_num = 1
self.insert_round_num = 700
self.row_num_per_round = 15
+ self.start_ts = 1704082431000
def prepare_data(self):
# database
@@ -39,7 +40,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_binary%s values" % (i+1)
for k in range(self.row_num_per_round):
- sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), 'a' * self.max_column_length)
+ sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), 'a' * self.max_column_length)
tdSql.execute(sql)
tdLog.info(f"Insert {self.row_num_per_round} rows data into ct_binary{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@@ -63,7 +64,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_varchar%s values" % (i+1)
for k in range(self.row_num_per_round):
- sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), 'b' * self.max_column_length)
+ sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), 'b' * self.max_column_length)
tdSql.execute(sql)
tdLog.info(f"Insert {self.row_num_per_round} rows data into ct_varchar{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@@ -98,7 +99,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_nchar%s values" % (i+1)
for k in range(self.row_num_per_round):
- sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), column)
+ sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), column)
tdSql.execute(sql)
tdLog.info(f"Insert {self.row_num_per_round} rows data into ct_nchar{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@@ -124,7 +125,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_varbinary%s values" % (i+1)
for k in range(row_num_per_round):
- sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), '\\x' + column)
+ sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), '\\x' + column)
tdSql.execute(sql)
tdLog.info(f"Insert {row_num_per_round} rows data into ct_varbinary{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@@ -153,7 +154,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_json_tag%s values" % (i+1)
for k in range(row_num_per_round):
- sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), '\\x' + column)
+ sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), '\\x' + column)
tdSql.execute(sql)
tdLog.info(f"Insert {row_num_per_round} rows data into ct_json_tag{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
diff --git a/tests/army/community/query/show.py b/tests/army/community/query/show.py
index 980e60387a..72b84954cd 100644
--- a/tests/army/community/query/show.py
+++ b/tests/army/community/query/show.py
@@ -118,6 +118,22 @@ class TDTestCase(TBase):
tdSql.query(sql)
tdSql.checkRows(5)
+ # verification for TD-30030
+ tdSql.execute("create table t100 (ts timestamp, pk varchar(20) primary key, c1 varchar(100)) tags (id int)")
+ tdSql.execute("insert into ct1 using t100 tags(1) values('2024-05-17 14:58:52.902', 'a1', '100')")
+ tdSql.execute("insert into ct1 using t100 tags(1) values('2024-05-17 14:58:52.902', 'a2', '200')")
+ tdSql.execute("insert into ct1 using t100 tags(1) values('2024-05-17 14:58:52.902', 'a3', '300')")
+ tdSql.execute("insert into ct2 using t100 tags(2) values('2024-05-17 14:58:52.902', 'a2', '200')")
+ tdSql.execute("create view v100 as select * from t100")
+ tdSql.execute("create view v200 as select * from ct1")
+
+ tdSql.error("show tags from v100", expectErrInfo="Tags can only applied to super table and child table")
+ tdSql.error("show tags from v200", expectErrInfo="Tags can only applied to super table and child table")
+
+ tdSql.execute("create table t200 (ts timestamp, pk varchar(20) primary key, c1 varchar(100))")
+
+ tdSql.error("show tags from t200", expectErrInfo="Tags can only applied to super table and child table")
+
def checkShow(self):
# not support
sql = "show accounts;"
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 7348b1fea0..342389562a 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -323,6 +323,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
+,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/empty_identifier.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py
diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh
index ef9413a9d5..e58f890ccd 100755
--- a/tests/parallel_test/run.sh
+++ b/tests/parallel_test/run.sh
@@ -315,19 +315,22 @@ function run_thread() {
fi
if [ -n "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m"
- local build_dir=$log_dir/build_${hosts[index]}
- local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build"
- # if [ $ent -ne 0 ]; then
- # remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
- # fi
- mkdir "$build_dir" 2>/dev/null
- if [ $? -eq 0 ]; then
- # scp build binary
- cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
- echo "$cmd"
- $cmd >/dev/null
- fi
fi
+ # scp build binary and unit test log
+ local build_dir=$log_dir/build_${hosts[index]}
+ local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build"
+ local remote_unit_test_log_dir="${workdirs[index]}/${DEBUGPATH}/Testing/Temporary/"
+
+ mkdir "$build_dir" 2>/dev/null
+ if [ $? -eq 0 ]; then
+ cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
+ echo "$cmd"
+ $cmd >/dev/null
+ cmd="$scpcmd:${remote_unit_test_log_dir}/* ${build_dir}/"
+ echo "$cmd"
+ $cmd >/dev/null
+ fi
+
# get remote sim dir
local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no"
local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"
diff --git a/tests/system-test/0-others/empty_identifier.py b/tests/system-test/0-others/empty_identifier.py
new file mode 100644
index 0000000000..a576781d86
--- /dev/null
+++ b/tests/system-test/0-others/empty_identifier.py
@@ -0,0 +1,180 @@
+import taos
+import sys
+import time
+import socket
+import os
+import threading
+import math
+from datetime import datetime
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import *
+from util.common import *
+# from tmqCommon import *
+
+COMPARE_DATA = 0
+COMPARE_LEN = 1
+
+class TDTestCase:
+ def __init__(self):
+ self.vgroups = 4
+ self.ctbNum = 10
+ self.rowsPerTbl = 10000
+ self.duraion = '1h'
+
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug(f"start to excute {__file__}")
+ tdSql.init(conn.cursor(), True)
+
+ def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
+ if dropFlag == 1:
+ tsql.execute("drop database if exists %s"%(dbName))
+
+ tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
+ tdLog.debug("complete to create database %s"%(dbName))
+ return
+
+ def create_stable(self,tsql, paraDict):
+ colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
+ tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
+ sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
+ tdLog.debug("%s"%(sqlString))
+ tsql.execute(sqlString)
+ return
+
+ def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
+ for i in range(ctbNum):
+ sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
+ (dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
+ tsql.execute(sqlString)
+
+ tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
+ return
+
+ def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
+ tdLog.debug("start to insert data ............")
+ tsql.execute("use %s" %dbName)
+ pre_insert = "insert into "
+ sql = pre_insert
+
+ for i in range(ctbNum):
+ rowsBatched = 0
+ sql += " %s%d values "%(ctbPrefix,i)
+ for j in range(rowsPerTbl):
+ if (i < ctbNum/2):
+ sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
+ else:
+ sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
+ rowsBatched += 1
+ if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
+ tsql.execute(sql)
+ rowsBatched = 0
+ if j < rowsPerTbl - 1:
+ sql = "insert into %s%d values " %(ctbPrefix,i)
+ else:
+ sql = "insert into "
+ if sql != pre_insert:
+ tsql.execute(sql)
+ tdLog.debug("insert data ............ [OK]")
+ return
+
+ def prepareTestEnv(self):
+ tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
+ paraDict = {'dbName': 'test',
+ 'dropFlag': 1,
+ 'vgroups': 2,
+ 'stbName': 'meters',
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1},
+ {'type': 'BIGINT', 'count':1},
+ {'type': 'FLOAT', 'count':1},
+ {'type': 'DOUBLE', 'count':1},
+ {'type': 'smallint', 'count':1},
+ {'type': 'tinyint', 'count':1},
+ {'type': 'bool', 'count':1},
+ {'type': 'binary', 'len':10, 'count':1},
+ {'type': 'nchar', 'len':10, 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
+ 'ctbPrefix': 't',
+ 'ctbStartIdx': 0,
+ 'ctbNum': 100,
+ 'rowsPerTbl': 10000,
+ 'batchNum': 3000,
+ 'startTs': 1537146000000,
+ 'tsStep': 600000}
+
+ paraDict['vgroups'] = self.vgroups
+ paraDict['ctbNum'] = self.ctbNum
+ paraDict['rowsPerTbl'] = self.rowsPerTbl
+
+ tdLog.info("create database")
+ self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
+
+ tdLog.info("create stb")
+ self.create_stable(tsql=tdSql, paraDict=paraDict)
+
+ tdLog.info("create child tables")
+ self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
+ stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
+ ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
+ self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
+ ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
+ rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
+ startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
+ return
+
+ def run(self):
+ self.prepareTestEnv()
+ self.test_empty_identifier()
+
+ def execute_sql_and_expect_err(self, sql: str, err: int):
+ tdSql.error(sql, err)
+
+ def test_empty_identifier(self):
+ ## invalid identifier
+ sqls = [
+ 'show create table ``',
+ 'show create table test.``',
+ 'create table `` (ts timestamp, c1 int)',
+ 'drop table ``',
+ 'alter table `` add column c2 int',
+ 'select * from ``',
+ 'alter table meters add column `` int',
+ 'alter table meters drop column ``',
+ 'alter table t0 set tag `` = ""',
+ 'alter stable meters add tag `` int',
+ 'alter stable meters rename tag cc ``',
+ 'alter stable meters drop tag ``',
+ 'insert into `` select * from t0',
+ 'insert into t100 using `` tags('', '') values(1,1,1)',
+ 'create view `` as select count(*) from meters interval(10s)',
+ 'create view ``.view1 as select count(*) from meters'
+ 'create tsma `` on meters function(count(c1)) interval(1m)',
+ 'create tsma tsma1 on `` function(count(c1)) interval(1m)',
+ 'create stream `` into st1 as select count(*) from meters interval(10s)',
+ 'create stream stream1 into `` as select count(*) from meters interval(10s)',
+ 'create stream stream1 into st1 as select count(*) from `` interval(10s)',
+ 'create stream stream1 trigger max_delay 100s into st1 as select count(*) from `` interval(10s)',
+ 'drop view ``',
+ 'drop tsma ``',
+ 'drop view ``.st1',
+ 'create topic `` as select count(*) from meters interval(10s)',
+ 'drop topic ``',
+ 'insert into `` values(1,1,1)',
+ ]
+
+ for sql in sqls:
+ self.execute_sql_and_expect_err(sql, -2147473897)
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+event = threading.Event()
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/2-query/group_partition.py b/tests/system-test/2-query/group_partition.py
index 4b236c1bce..c63b8af9df 100644
--- a/tests/system-test/2-query/group_partition.py
+++ b/tests/system-test/2-query/group_partition.py
@@ -237,6 +237,9 @@ class TDTestCase:
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and t2=0;")
tdSql.checkRows(1)
+ tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and t2=0 having count(*) > 10;")
+ tdSql.checkRows(0)
+
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _rowts>0;")
tdSql.checkRows(nonempty_tb_num)
diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py
index 3532825493..acb80f528b 100644
--- a/tests/system-test/8-stream/stream_multi_agg.py
+++ b/tests/system-test/8-stream/stream_multi_agg.py
@@ -39,7 +39,7 @@ class TDTestCase:
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &")
time.sleep(10)
- tdSql.query("use test")
+ tdSql.execute("use test", queryTimes=100)
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream and insert data ok========")
time.sleep(15)
@@ -66,7 +66,7 @@ class TDTestCase:
os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1")
# create stream
- tdSql.execute("use db")
+ tdSql.execute("use db", queryTimes=100)
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
time.sleep(5)
diff --git a/tests/system-test/pytest.sh b/tests/system-test/pytest.sh
index 2837c817be..060717c20e 100755
--- a/tests/system-test/pytest.sh
+++ b/tests/system-test/pytest.sh
@@ -89,8 +89,8 @@ else
export LD_PRELOAD="$(realpath "$(gcc -print-file-name=libasan.so)") $(realpath "$(gcc -print-file-name=libstdc++.so)")"
echo "Preload AsanSo:" $?
- $* -a 2>$AsanFile
-
+ $* -a 2> $AsanFile
+ cat $AsanFile
unset LD_PRELOAD
for ((i = 1; i <= 20; i++)); do
AsanFileLen=$(cat $AsanFile | wc -l)
diff --git a/utils/tsim/src/simSystem.c b/utils/tsim/src/simSystem.c
index 645cf0e6fe..dcf5d6ab12 100644
--- a/utils/tsim/src/simSystem.c
+++ b/utils/tsim/src/simSystem.c
@@ -28,7 +28,7 @@ extern bool simExecSuccess;
int32_t simInitCfg() {
taosCreateLog("simlog", 1, configDir, NULL, NULL, NULL, NULL, 1);
- taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1, true);
+ taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1);
SConfig *pCfg = taosGetCfg();
tstrncpy(simScriptDir, cfgGetItem(pCfg, "scriptDir")->str, PATH_MAX);