other: merge 3.0

This commit is contained in:
Haojun Liao 2024-05-24 11:41:51 +08:00
commit 89fb56e77d
58 changed files with 804 additions and 256 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -421,7 +421,7 @@ The charset that takes effect is UTF-8.
| Applicable | Server Only |
| Meaning | Maximum number of vnodes per dnode |
| Value Range | 0-4096 |
| Default Value | 2x the CPU cores |
| Default Value | 2x the CPU cores plus 5 |
## Performance Tuning

View File

@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
## 3.3.0.3
<Release type="tdengine" version="3.3.0.3" />
## 3.3.0.0
<Release type="tdengine" version="3.3.0.0" />

View File

@ -419,7 +419,7 @@ charset 的有效值是 UTF-8。
| 适用范围 | 仅服务端适用 |
| 含义 | dnode 支持的最大 vnode 数目 |
| 取值范围 | 0-4096 |
| 缺省值 | CPU 核数的 2 倍 |
| 缺省值 | CPU 核数的 2 倍 + 5 |
## 性能调优

View File

@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.3.0.3
<Release type="tdengine" version="3.3.0.3" />
## 3.3.0.0
<Release type="tdengine" version="3.3.0.0" />

View File

@ -274,7 +274,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, c
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid);
bool alreadyAddGroupId(char* ctbName);
bool alreadyAddGroupId(char* ctbName, int64_t groupId);
bool isAutoTableName(char* ctbName);
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);

View File

@ -257,8 +257,10 @@ extern bool tsExperimental;
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs);
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc, bool isDumpCfg);
bool tsc);
void taosCleanupCfg();
int32_t taosCfgDynamicOptions(SConfig *pCfg, const char *name, bool forServer);

View File

@ -2358,20 +2358,24 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp* pRsp);
typedef struct {
int32_t dnodeId;
char* token;
} SMArbUpdateGroupReqMember;
} SMArbUpdateGroupMember;
typedef struct {
int32_t vgId;
int64_t dbUid;
SMArbUpdateGroupReqMember members[2];
int8_t isSync;
SMArbUpdateGroupReqMember assignedLeader;
int64_t version;
} SMArbUpdateGroupReq;
int32_t vgId;
int64_t dbUid;
SMArbUpdateGroupMember members[2];
int8_t isSync;
SMArbUpdateGroupMember assignedLeader;
int64_t version;
} SMArbUpdateGroup;
int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
int32_t tDeserializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq* pReq);
typedef struct {
SArray* updateArray; // SMArbUpdateGroup
} SMArbUpdateGroupBatchReq;
int32_t tSerializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
int32_t tDeserializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq* pReq);
typedef struct {
char queryStrId[TSDB_QUERY_ID_LEN];

View File

@ -389,7 +389,8 @@
TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8
TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG)

View File

@ -327,7 +327,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) //
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A)
#define TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x039B)
#define TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x039C)
// mnode-node
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
@ -612,6 +611,16 @@ int32_t* taosGetErrno();
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
#define TSDB_CODE_GRANT_BASIC_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0824)
#define TSDB_CODE_GRANT_STREAM_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0825)
#define TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0826)
#define TSDB_CODE_GRANT_VIEW_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0827)
#define TSDB_CODE_GRANT_AUDIT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0828)
#define TSDB_CODE_GRANT_CSV_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0829)
#define TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082A)
#define TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082B)
#define TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082C)
#define TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082D)
// sync
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x

View File

@ -725,7 +725,7 @@ void taos_init_imp(void) {
return;
}
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1, true) != 0) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
tscInitRes = -1;
return;
}

View File

@ -92,6 +92,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
}
if (!pRsp) {
releaseTscObj(pReq->connKey.tscRid);
taosHashCancelIterate(hbMgr->activeInfo, pReq);
break;
}
}

View File

@ -2435,18 +2435,13 @@ void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId
// the total length is fixed to be 34 bytes.
bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0] == 't' && ctbName[1] == '_'); }
bool alreadyAddGroupId(char* ctbName) {
size_t len = strlen(ctbName);
if (len == 0) return false;
size_t _location = len - 1;
while (_location > 0) {
if (ctbName[_location] < '0' || ctbName[_location] > '9') {
break;
}
_location--;
}
return ctbName[_location] == '_' && len - 1 - _location >= 15; // 15 means the min length of groupid
bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
char tmp[64] = {0};
snprintf(tmp, sizeof(tmp), "%" PRIu64, groupId);
size_t len1 = strlen(ctbName);
size_t len2 = strlen(tmp);
if (len1 < len2) return false;
return memcmp(ctbName + len1 - len2, tmp, len2) == 0;
}
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {

View File

@ -624,7 +624,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
tsNumOfSupportVnodes = tsNumOfCores * 2;
tsNumOfSupportVnodes = tsNumOfCores * 2 + 5;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
@ -878,7 +878,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(tsCfg, "supportVnodes");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfSupportVnodes = numOfCores * 2;
tsNumOfSupportVnodes = numOfCores * 2 + 5;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
pItem->i32 = tsNumOfSupportVnodes;
pItem->stype = stype;
@ -1377,6 +1377,35 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
return 0;
}
int32_t taosReadDataFolder(const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs) {
if (tsCfg == NULL) osDefaultInit();
SConfig *pCfg = cfgInit();
if (pCfg == NULL) return -1;
if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
printf("failed to load cfg since %s", terrstr());
cfgCleanup(pCfg);
return -1;
}
if (cfgLoadFromArray(pCfg, pArgs) != 0) {
printf("failed to load cfg from array since %s", terrstr());
cfgCleanup(pCfg);
return -1;
}
tstrncpy(tsDataDir, cfgGetItem(pCfg, "dataDir")->str, PATH_MAX);
dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32;
cfgCleanup(pCfg);
return 0;
}
static int32_t taosCheckGlobalCfg() {
uint32_t ipv4 = taosGetIpv4FromFqdn(tsLocalFqdn);
if (ipv4 == 0xffffffff) {
@ -1394,7 +1423,7 @@ static int32_t taosCheckGlobalCfg() {
}
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc, bool isDumpCfg) {
bool tsc) {
if (tsCfg != NULL) return 0;
tsCfg = cfgInit();
@ -1441,7 +1470,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
taosSetAllDebugFlag(tsCfg, cfgGetItem(tsCfg, "debugFlag")->i32);
if(isDumpCfg) cfgDumpCfg(tsCfg, tsc, false);
cfgDumpCfg(tsCfg, tsc, false);
if (taosCheckGlobalCfg() != 0) {
return -1;

View File

@ -6437,21 +6437,28 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp *pRsp) {
taosMemoryFreeClear(pRsp->memberToken);
}
int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) {
int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1;
int32_t sz = taosArrayGetSize(pReq->updateArray);
if (tEncodeI32(&encoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
if (tEncodeI32(&encoder, pGroup->vgId) < 0) return -1;
if (tEncodeI64(&encoder, pGroup->dbUid) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tEncodeI32(&encoder, pGroup->members[i].dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pGroup->members[i].token) < 0) return -1;
}
if (tEncodeI8(&encoder, pGroup->isSync) < 0) return -1;
if (tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pGroup->assignedLeader.token) < 0) return -1;
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1;
if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1;
if (tEncodeI64(&encoder, pReq->version) < 0) return -1;
tEndEncode(&encoder);
@ -6460,23 +6467,34 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou
return tlen;
}
int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) {
int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1;
pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1;
int32_t sz = 0;
if (tDecodeI32(&decoder, &sz) < 0) return -1;
SArray *updateArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
if (!updateArray) return -1;
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup group = {0};
if (tDecodeI32(&decoder, &group.vgId) < 0) return -1;
if (tDecodeI64(&decoder, &group.dbUid) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tDecodeI32(&decoder, &group.members[i].dnodeId) < 0) return -1;
group.members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.members[i].token) < 0) return -1;
}
if (tDecodeI8(&decoder, &group.isSync) < 0) return -1;
if (tDecodeI32(&decoder, &group.assignedLeader.dnodeId) < 0) return -1;
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &group.version) < 0) return -1;
taosArrayPush(updateArray, &group);
}
if (tDecodeI8(&decoder, &pReq->isSync) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1;
pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->version) < 0) return -1;
pReq->updateArray = updateArray;
tEndDecode(&decoder);
@ -6484,14 +6502,20 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr
return 0;
}
void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq *pReq) {
if (NULL == pReq) {
void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq *pReq) {
if (NULL == pReq || NULL == pReq->updateArray) {
return;
}
for (int i = 0; i < 2; i++) {
taosMemoryFreeClear(pReq->members[i].token);
int32_t sz = taosArrayGetSize(pReq->updateArray);
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
taosMemoryFreeClear(pGroup->members[i].token);
}
taosMemoryFreeClear(pGroup->assignedLeader.token);
}
taosMemoryFreeClear(pReq->assignedLeader.token);
taosArrayDestroy(pReq->updateArray);
}
// int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {

View File

@ -692,4 +692,38 @@ TEST(timeTest, epSet) {
ASSERT_EQ(ep.numOfEps, 1);
}
}
// Define test cases
TEST(AlreadyAddGroupIdTest, GroupIdAdded) {
// Test case 1: Group ID has been added
char ctbName[64] = "abc123";
int64_t groupId = 123;
bool result = alreadyAddGroupId(ctbName, groupId);
EXPECT_TRUE(result);
}
TEST(AlreadyAddGroupIdTest, GroupIdNotAdded) {
// Test case 2: Group ID has not been added
char ctbName[64] = "abc456";
int64_t groupId = 123;
bool result = alreadyAddGroupId(ctbName, groupId);
EXPECT_FALSE(result);
}
TEST(AlreadyAddGroupIdTest, GroupIdAddedAtTheEnd) {
// Test case 3: Group ID has been added at the end
char ctbName[64] = "xyz1";
int64_t groupId = 1;
bool result = alreadyAddGroupId(ctbName, groupId);
EXPECT_TRUE(result);
}
TEST(AlreadyAddGroupIdTest, GroupIdAddedWithDifferentLength) {
// Test case 4: Group ID has been added with different length
char ctbName[64] = "def";
int64_t groupId = 123456;
bool result = alreadyAddGroupId(ctbName, groupId);
EXPECT_FALSE(result);
}
#pragma GCC diagnostic pop

View File

@ -205,11 +205,11 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
if(i < argc - 1) {
int32_t len = strlen(argv[++i]);
if (len < ENCRYPT_KEY_LEN_MIN) {
printf("Error: Encrypt key should be at least %d characters\n", ENCRYPT_KEY_LEN_MIN);
printf("ERROR: Encrypt key should be at least %d characters\n", ENCRYPT_KEY_LEN_MIN);
return -1;
}
if (len > ENCRYPT_KEY_LEN) {
printf("Error: Encrypt key overflow, it should be at most %d characters\n", ENCRYPT_KEY_LEN);
printf("ERROR: Encrypt key overflow, it should be at most %d characters\n", ENCRYPT_KEY_LEN);
return -1;
}
tstrncpy(global.encryptKey, argv[i], ENCRYPT_KEY_LEN);
@ -371,6 +371,22 @@ int mainWindows(int argc, char **argv) {
printf("memory dbg enabled\n");
}
#endif
if(global.generateCode) {
bool toLogFile = false;
if(taosReadDataFolder(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs) != 0){
encryptError("failed to generate encrypt code since taosd is running, please stop it first");
return -1;
};
if(dmCheckRunning(tsDataDir) == NULL) {
encryptError("failed to generate encrypt code since taosd is running, please stop it first");
return -1;
}
int ret = dmUpdateEncryptKey(global.encryptKey, toLogFile);
taosCloseLog();
taosCleanupArgs();
return ret;
}
if (dmInitLog() != 0) {
printf("failed to start since init log error\n");
@ -380,28 +396,13 @@ int mainWindows(int argc, char **argv) {
dmPrintArgs(argc, argv);
bool isDumpCfg = true;
if(global.generateCode) {
isDumpCfg = false;
}
if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0, isDumpCfg) != 0) {
if (taosInitCfg(configDir, global.envCmd, global.envFile, global.apolloUrl, global.pArgs, 0) != 0) {
dError("failed to start since read config error");
taosCloseLog();
taosCleanupArgs();
return -1;
}
if(global.generateCode) {
if(dmCheckRunning(tsDataDir) == NULL) {
dError("failed to generate encrypt code since taosd is running, please stop it first");
return -1;
}
int ret = dmUpdateEncryptKey(global.encryptKey);
taosCloseLog();
taosCleanupArgs();
return ret;
}
if(dmGetEncryptKey() != 0){
dError("failed to start since failed to get encrypt key");
taosCloseLog();

View File

@ -220,7 +220,7 @@ int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _exit;
}
code = dmUpdateEncryptKey(cfgReq.value);
code = dmUpdateEncryptKey(cfgReq.value, true);
if (code == 0) {
tsEncryptionKeyChksum = taosCalcChecksum(0, cfgReq.value, strlen(cfgReq.value));
tsEncryptionKeyStat = ENCRYPT_KEY_STAT_LOADED;

View File

@ -53,6 +53,36 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#define encryptDebug(...) { \
if (toLogFile) { \
if (dDebugFlag & DEBUG_DEBUG) {taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__);} \
} else { \
/*if (dDebugFlag & DEBUG_DEBUG) {taosPrintLog("DND ", DEBUG_SCREEN, dDebugFlag, __VA_ARGS__);}*/ \
if (dDebugFlag & DEBUG_DEBUG) {printf(__VA_ARGS__); printf("\n");} \
} \
}
#define encryptInfo(...) { \
if (toLogFile) { \
taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); \
} else { \
/*if (dDebugFlag & DEBUG_DEBUG) {taosPrintLog("DND ", DEBUG_SCREEN, dDebugFlag, __VA_ARGS__);}*/ \
printf(__VA_ARGS__); \
printf("\n"); \
} \
}
#define encryptError(...) { \
if (toLogFile) { \
taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
}\
else{ \
/*taosPrintLog("DND ", DEBUG_SCREEN, 255, __VA_ARGS__); */\
printf("ERROR: " __VA_ARGS__); \
printf("\n"); \
}\
}
#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}}
#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}}
#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}}
@ -194,7 +224,7 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
bool dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port);
void dmRemoveDnodePairs(SDnodeData *pData);
void dmGetDnodeEp(void *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
int32_t dmUpdateEncryptKey(char *key);
int32_t dmUpdateEncryptKey(char *key, bool toLogFile);
int32_t dmGetEncryptKey();
#ifdef __cplusplus
}

View File

@ -186,7 +186,7 @@ TdFilePtr dmCheckRunning(const char *dataDir) {
extern int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode);
static int32_t dmWriteCheckCodeFile(char* file, char* realfile, char* key){
static int32_t dmWriteCheckCodeFile(char* file, char* realfile, char* key, bool toLogFile){
TdFilePtr pFile = NULL;
char *result = NULL;
int32_t code = -1;
@ -211,7 +211,8 @@ static int32_t dmWriteCheckCodeFile(char* file, char* realfile, char* key){
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
dInfo("succeed to write checkCode file:%s", realfile);
encryptDebug("succeed to write checkCode file:%s", realfile);
code = 0;
_OVER:
if(pFile != NULL) taosCloseFile(&pFile);
@ -220,7 +221,7 @@ _OVER:
return code;
}
static int32_t dmWriteEncryptCodeFile(char* file, char* realfile, char* encryptCode){
static int32_t dmWriteEncryptCodeFile(char* file, char* realfile, char* encryptCode, bool toLogFile){
TdFilePtr pFile = NULL;
int32_t code = -1;
@ -234,7 +235,7 @@ static int32_t dmWriteEncryptCodeFile(char* file, char* realfile, char* encryptC
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
dInfo("succeed to write encryptCode file:%s", realfile);
encryptDebug("succeed to write encryptCode file:%s", realfile);
code = 0;
_OVER:
@ -243,7 +244,7 @@ _OVER:
return code;
}
static int32_t dmCompareEncryptKey(char* file, char* key){
static int32_t dmCompareEncryptKey(char* file, char* key, bool toLogFile){
char *content = NULL;
int64_t size = 0;
TdFilePtr pFile = NULL;
@ -253,13 +254,13 @@ static int32_t dmCompareEncryptKey(char* file, char* key){
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to open dnode file:%s since %s", file, terrstr());
encryptError("failed to open dnode file:%s since %s", file, terrstr());
goto _OVER;
}
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to fstat dnode file:%s since %s", file, terrstr());
encryptError("failed to fstat dnode file:%s since %s", file, terrstr());
goto _OVER;
}
@ -271,11 +272,11 @@ static int32_t dmCompareEncryptKey(char* file, char* key){
if (taosReadFile(pFile, content, size) != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to read dnode file:%s since %s", file, terrstr());
encryptError("failed to read dnode file:%s since %s", file, terrstr());
goto _OVER;
}
dInfo("succeed to read checkCode file:%s", file);
encryptDebug("succeed to read checkCode file:%s", file);
int len = ENCRYPTED_LEN(size);
result = taosMemoryMalloc(len);
@ -290,11 +291,11 @@ static int32_t dmCompareEncryptKey(char* file, char* key){
if(strcmp(opts.result, DM_KEY_INDICATOR) != 0) {
terrno = TSDB_CODE_DNODE_ENCRYPTKEY_CHANGED;
dError("failed to compare decrypted result");
encryptError("failed to compare decrypted result");
goto _OVER;
}
dInfo("succeed to compare checkCode file:%s", file);
encryptDebug("succeed to compare checkCode file:%s", file);
code = 0;
_OVER:
if(result != NULL) taosMemoryFree(result);
@ -304,7 +305,7 @@ _OVER:
return code;
}
int32_t dmUpdateEncryptKey(char *key) {
int32_t dmUpdateEncryptKey(char *key, bool toLogFile) {
#ifdef TD_ENTERPRISE
int32_t code = -1;
char *machineId = NULL;
@ -328,12 +329,12 @@ int32_t dmUpdateEncryptKey(char *key) {
if (taosMkDir(folder) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", folder, terrstr());
encryptError("failed to create dir:%s since %s", folder, terrstr());
goto _OVER;
}
if(taosCheckExistFile(realCheckFile)){
if(dmCompareEncryptKey(realCheckFile, key) != 0){
if(dmCompareEncryptKey(realCheckFile, key, toLogFile) != 0){
goto _OVER;
}
}
@ -347,21 +348,23 @@ int32_t dmUpdateEncryptKey(char *key) {
goto _OVER;
}
if(dmWriteEncryptCodeFile(encryptFile, realEncryptFile, encryptCode) != 0){
if(dmWriteEncryptCodeFile(encryptFile, realEncryptFile, encryptCode, toLogFile) != 0){
goto _OVER;
}
if(dmWriteCheckCodeFile(checkFile, realCheckFile, key) != 0){
if(dmWriteCheckCodeFile(checkFile, realCheckFile, key, toLogFile) != 0){
goto _OVER;
}
encryptInfo("Succeed to update encrypt key\n");
code = 0;
_OVER:
taosMemoryFree(encryptCode);
taosMemoryFree(machineId);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to update encrypt key since %s", terrstr());
encryptError("failed to update encrypt key since %s", terrstr());
}
return code;
#else
@ -453,7 +456,7 @@ int32_t dmGetEncryptKey(){
goto _OVER;
}
if(dmCompareEncryptKey(checkFile, encryptKey) != 0){
if(dmCompareEncryptKey(checkFile, encryptKey, true) != 0){
goto _OVER;
}

View File

@ -38,7 +38,7 @@ bool TestServer::Start() {
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
taosThreadCreate(&threadId, &thAttr, serverLoop, this);
taosThreadAttrDestroy(&thAttr);
taosMsleep(2100);
taosMsleep(10000);
return runnning;
}

View File

@ -180,7 +180,7 @@ typedef struct {
tmsg_t originRpcType;
char dbname[TSDB_TABLE_FNAME_LEN];
char stbname[TSDB_TABLE_FNAME_LEN];
int32_t arbGroupId;
SHashObj* arbGroupIds;
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;

View File

@ -78,7 +78,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId);
void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId);
void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans);
void mndTransSetChangeless(STrans *pTrans);

View File

@ -39,10 +39,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq);
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
@ -68,7 +69,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP, mndProcessArbUpdateGroupReq);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
@ -81,9 +82,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupArbGroup(SMnode *pMnode) {
taosHashCleanup(arbUpdateHash);
}
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
@ -541,6 +540,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return -1;
}
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) {
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break;
@ -612,40 +613,27 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
SArbGroup newGroup = {0};
mndArbGroupDupObj(&arbGroupDup, &newGroup);
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId,
terrstr());
sdbRelease(pSdb, pArbGroup);
return -1;
}
mInfo("vgId:%d, arb pull up set assigned leader to dnodeId:%d", vgId, pMember->info.dnodeId);
taosArrayPush(pUpdateArray, &newGroup);
sdbRelease(pSdb, pArbGroup);
}
(void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
taosArrayDestroy(pUpdateArray);
return 0;
}
static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) {
SMArbUpdateGroupReq req = {0};
req.vgId = pNewGroup->vgId;
req.dbUid = pNewGroup->dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId;
req.members[i].token = pNewGroup->members[i].state.token;
}
req.isSync = pNewGroup->isSync;
req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId;
req.assignedLeader.token = pNewGroup->assignedLeader.token;
req.version = pNewGroup->version;
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
SMArbUpdateGroupBatchReq req = {0};
req.updateArray = updateArray;
int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req);
int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
if (contLen <= 0) return NULL;
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) return NULL;
if (tSerializeSMArbUpdateGroupReq(pHead, contLen, &req) <= 0) {
if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
rpcFreeCont(pHead);
return NULL;
}
@ -653,60 +641,172 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
return pHead;
}
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
outGroup->vgId = pGroup->vgId;
outGroup->dbUid = pGroup->dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
outGroup->members[i].token = pGroup->members[i].state.token; // just copy the pointer
}
outGroup->isSync = pGroup->isSync;
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
outGroup->version = pGroup->version;
}
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
return 0;
}
int32_t contLen = 0;
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
if (!pHead) {
mError("vgId:%d, failed to build arb-update-group request", pNewGroup->vgId);
return -1;
}
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
int32_t ret = -1;
int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
if (ret == 0) {
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
SMArbUpdateGroup newGroup = {0};
mndInitArbUpdateGroup(pNewGroup, &newGroup);
SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
taosArrayPush(pArray, &newGroup);
int32_t contLen = 0;
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
if (!pHead) {
mError("failed to build arb-update-group request");
goto _OVER;
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
if (ret != 0) goto _OVER;
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
_OVER:
taosArrayDestroy(pArray);
return ret;
}
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
int ret = 0;
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
int32_t ret = -1;
SMArbUpdateGroupReq req = {0};
tDeserializeSMArbUpdateGroupReq(pReq->pCont, pReq->contLen, &req);
size_t sz = taosArrayGetSize(newGroupArray);
SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
for (size_t i = 0; i < sz; i++) {
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
continue;
}
SArbGroup newGroup = {0};
newGroup.vgId = req.vgId;
newGroup.dbUid = req.dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
newGroup.members[i].info.dnodeId = req.members[i].dnodeId;
memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE);
SMArbUpdateGroup newGroup = {0};
mndInitArbUpdateGroup(pNewGroup, &newGroup);
taosArrayPush(pArray, &newGroup);
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
}
newGroup.isSync = req.isSync;
newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId;
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.version = req.version;
SMnode *pMnode = pReq->info.node;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
if (!pOldGroup) {
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
return 0;
}
sdbRelease(pMnode->pSdb, pOldGroup);
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr());
ret = -1;
if (taosArrayGetSize(pArray) == 0) {
ret = 0;
goto _OVER;
}
tFreeSMArbUpdateGroupReq(&req);
int32_t contLen = 0;
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
if (!pHead) {
mError("failed to build arb-update-group request");
goto _OVER;
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
_OVER:
taosArrayDestroy(pArray);
if (ret != 0) {
for (size_t i = 0; i < sz; i++) {
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId));
}
}
return ret;
}
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
int ret = -1;
size_t sz = 0;
SMArbUpdateGroupBatchReq req = {0};
if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
mError("arb failed to decode arb-update-group request");
return -1;
}
SMnode *pMnode = pReq->info.node;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
if (pTrans == NULL) {
mError("failed to update arbgroup in create trans, since %s", terrstr());
goto _OVER;
}
sz = taosArrayGetSize(req.updateArray);
for (size_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
SArbGroup newGroup = {0};
newGroup.vgId = pUpdateGroup->vgId;
newGroup.dbUid = pUpdateGroup->dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
}
newGroup.isSync = pUpdateGroup->isSync;
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.version = pUpdateGroup->version;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
if (!pOldGroup) {
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t));
continue;
}
mndTransAddArbGroupId(pTrans, newGroup.vgId);
if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
terrstr());
goto _OVER;
}
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token);
sdbRelease(pMnode->pSdb, pOldGroup);
}
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
ret = 0;
_OVER:
if (ret != 0) {
// failed to update arbgroup
for (size_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t));
}
}
mndTransDrop(pTrans);
tFreeSMArbUpdateGroupBatchReq(&req);
return ret;
}
@ -739,7 +839,7 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
pNew->assignedLeader.token);
mndTransSetArbGroupId(pTrans, pNew->vgId);
mndTransAddArbGroupId(pTrans, pNew->vgId);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
ret = -1;
goto _OVER;
@ -816,10 +916,10 @@ _OVER:
}
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
int ret = 0;
int64_t nowMs = taosGetTimestampMs();
size_t size = taosArrayGetSize(memberArray);
SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
size_t size = taosArrayGetSize(memberArray);
for (size_t i = 0; i < size; i++) {
SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
@ -832,17 +932,16 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
if (updateToken) {
ret = mndPullupArbUpdateGroup(pMnode, &newGroup);
if (ret != 0) {
mInfo("failed to pullup update arb token, vgId:%d, since %s", pRspMember->vgId, terrstr());
}
taosArrayPush(pUpdateArray, &newGroup);
}
sdbRelease(pMnode->pSdb, pGroup);
if (ret != 0) break;
}
return ret;
(void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
taosArrayDestroy(pUpdateArray);
return 0;
}
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
@ -900,6 +999,11 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
}
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
if (pRsp->contLen == 0) {
mDebug("arb hb-rsp contLen is 0");
return 0;
}
int32_t ret = -1;
SMnode *pMnode = pRsp->info.node;
@ -914,6 +1018,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
SVArbHeartBeatRsp arbHbRsp = {0};
if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
@ -934,6 +1039,11 @@ _OVER:
}
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
if (pRsp->contLen == 0) {
mDebug("arb check-sync-rsp contLen is 0");
return 0;
}
int32_t ret = -1;
SMnode *pMnode = pRsp->info.node;
@ -948,7 +1058,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
SVArbCheckSyncRsp syncRsp = {0};
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
terrno = TSDB_CODE_SUCCESS;
return 0;
@ -1008,6 +1118,11 @@ _OVER:
}
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
if (pRsp->contLen == 0) {
mDebug("arb set-assigned-rsp contLen is 0");
return 0;
}
int32_t ret = -1;
SMnode *pMnode = pRsp->info.node;
@ -1022,8 +1137,8 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
terrno = TSDB_CODE_INVALID_MSG;
mInfo("arb set assigned failed, des failed since:%s", tstrerror(pRsp->code));
return -1;
}

View File

@ -821,8 +821,7 @@ static int32_t mndCheckDbEncryptKey(SMnode *pMnode, SCreateDbReq *pReq) {
#ifdef TD_ENTERPRISE
if (pReq->encryptAlgorithm == TSDB_ENCRYPT_ALGO_NONE) goto _exit;
if (grantCheck(TSDB_GRANT_DB_ENCRYPTION) != 0) {
code = TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED;
if ((code = grantCheck(TSDB_GRANT_DB_ENCRYPTION)) != 0) {
goto _exit;
}
if (tsEncryptionKeyStat != ENCRYPT_KEY_STAT_LOADED) {
@ -1226,7 +1225,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (terrno != 0) code = terrno;
mError("db:%s, failed to alter since %s", alterReq.db, terrstr());
mError("db:%s, failed to alter since %s", alterReq.db, tstrerror(code));
}
mndReleaseDb(pMnode, pDb);

View File

@ -1914,8 +1914,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
terrno = TSDB_CODE_GRANT_EXPIRED;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return -1;
}

View File

@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedTasks = NULL;
SArray *pOrphanTasks = NULL;
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
return -1;
}

View File

@ -26,7 +26,7 @@
#define TRANS_VER1_NUMBER 1
#define TRANS_VER2_NUMBER 2
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 48
#define TRANS_RESERVE_SIZE 44
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
@ -196,10 +196,21 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
}
SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER)
int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds);
SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER)
void *pIter = NULL;
pIter = taosHashIterate(pTrans->arbGroupIds, NULL);
while (pIter) {
int32_t arbGroupId = *(int32_t *)pIter;
SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER)
pIter = taosHashIterate(pTrans->arbGroupIds, pIter);
}
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0;
terrno = 0;
_OVER:
if (terrno != 0) {
@ -279,6 +290,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
int32_t dataPos = 0;
int32_t arbgroupIdNum = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
@ -350,6 +362,16 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
}
SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER);
pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER)
for (int32_t i = 0; i < arbgroupIdNum; ++i) {
int32_t arbGroupId = 0;
SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER)
taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0);
}
SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
terrno = 0;
@ -462,6 +484,9 @@ void mndTransDropData(STrans *pTrans) {
mndTransDropActions(pTrans->commitActions);
pTrans->commitActions = NULL;
}
if (pTrans->arbGroupIds != NULL) {
taosHashCleanup(pTrans->arbGroupIds);
}
if (pTrans->pRpcArray != NULL) {
taosArrayDestroy(pTrans->pRpcArray);
pTrans->pRpcArray = NULL;
@ -581,6 +606,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
taosInitRWLatch(&pTrans->lockRpcArray);
@ -733,7 +759,9 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
}
}
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId) { pTrans->arbGroupId = groupId; }
void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) {
taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0);
}
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
@ -821,7 +849,16 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
if (pNew->arbGroupId == pTrans->arbGroupId) conflict = true;
void *pIter = taosHashIterate(pNew->arbGroupIds, NULL);
while (pIter != NULL) {
int32_t groupId = *(int32_t *)pIter;
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
taosHashCancelIterate(pNew->arbGroupIds, pIter);
conflict = true;
break;
}
pIter = taosHashIterate(pNew->arbGroupIds, pIter);
}
}
}
@ -1372,7 +1409,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos);
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos);
STransAction *pAction = taosArrayGet(pActions, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code == 0) {

View File

@ -71,7 +71,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) {
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) {
buildCtbNameAddGroupId(stbFullName, name, groupId);
}
} else if (stbFullName) {
@ -182,7 +182,7 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) {
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) {
!alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid);
@ -713,7 +713,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
} else {
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName) && groupId != 0) {
!alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, dstTableName, groupId);

View File

@ -1238,6 +1238,18 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx));
// 1. prepare last
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&lRow, &tsdbRowKey);
{
SLastUpdateCtx updateCtx = {
.lflag = LFLAG_LAST,
.tsdbRowKey = tsdbRowKey,
.colVal = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP,
.val = lRow.pBlockData->aTSKEY[lRow.iRow]}))};
taosArrayPush(ctxArray, &updateCtx);
}
TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
@ -1263,9 +1275,6 @@ int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlo
}
// 2. prepare last row
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(&lRow, &tsdbRowKey);
STSDBRowIter iter = {0};
tsdbRowIterOpen(&iter, &lRow, pTSchema);
for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) {

View File

@ -202,6 +202,7 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
}
eventWindowAggImpl(pOperator, pInfo, pBlock);
doFilter(pRes, pSup->pFilterInfo, NULL);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
}

View File

@ -1424,7 +1424,7 @@ int main(int argc, char *argv[]) {
printf("failed to start since init log error\n");
}
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0, true) != 0) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -2;
}

View File

@ -127,7 +127,7 @@ int aggregateFuncTest() {
int main(int argc, char *argv[]) {
parseArgs(argc, argv);
initLog();
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0, true) != 0) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
}

View File

@ -167,7 +167,7 @@ static bool checkDbName(SAstCreateContext* pCxt, SToken* pDbName, bool demandDb)
}
} else {
trimEscape(pDbName);
if (pDbName->n >= TSDB_DB_NAME_LEN) {
if (pDbName->n >= TSDB_DB_NAME_LEN || pDbName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pDbName->z);
}
}
@ -176,7 +176,7 @@ static bool checkDbName(SAstCreateContext* pCxt, SToken* pDbName, bool demandDb)
static bool checkTableName(SAstCreateContext* pCxt, SToken* pTableName) {
trimEscape(pTableName);
if (NULL != pTableName && pTableName->n >= TSDB_TABLE_NAME_LEN) {
if (NULL != pTableName && pTableName->type != TK_NK_NIL && (pTableName->n >= TSDB_TABLE_NAME_LEN || pTableName->n == 0)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pTableName->z);
return false;
}
@ -185,7 +185,7 @@ static bool checkTableName(SAstCreateContext* pCxt, SToken* pTableName) {
static bool checkColumnName(SAstCreateContext* pCxt, SToken* pColumnName) {
trimEscape(pColumnName);
if (NULL != pColumnName && pColumnName->n >= TSDB_COL_NAME_LEN) {
if (NULL != pColumnName && pColumnName->type != TK_NK_NIL && (pColumnName->n >= TSDB_COL_NAME_LEN || pColumnName->n == 0)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pColumnName->z);
return false;
}
@ -203,7 +203,7 @@ static bool checkIndexName(SAstCreateContext* pCxt, SToken* pIndexName) {
static bool checkTopicName(SAstCreateContext* pCxt, SToken* pTopicName) {
trimEscape(pTopicName);
if (pTopicName->n >= TSDB_TOPIC_NAME_LEN) {
if (pTopicName->n >= TSDB_TOPIC_NAME_LEN || pTopicName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pTopicName->z);
return false;
}
@ -221,7 +221,7 @@ static bool checkCGroupName(SAstCreateContext* pCxt, SToken* pCGroup) {
static bool checkViewName(SAstCreateContext* pCxt, SToken* pViewName) {
trimEscape(pViewName);
if (pViewName->n >= TSDB_VIEW_NAME_LEN) {
if (pViewName->n >= TSDB_VIEW_NAME_LEN || pViewName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pViewName->z);
return false;
}
@ -230,7 +230,7 @@ static bool checkViewName(SAstCreateContext* pCxt, SToken* pViewName) {
static bool checkStreamName(SAstCreateContext* pCxt, SToken* pStreamName) {
trimEscape(pStreamName);
if (pStreamName->n >= TSDB_STREAM_NAME_LEN) {
if (pStreamName->n >= TSDB_STREAM_NAME_LEN || pStreamName->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pStreamName->z);
return false;
}
@ -252,6 +252,8 @@ static bool checkTsmaName(SAstCreateContext* pCxt, SToken* pTsmaToken) {
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
} else if (pTsmaToken->n >= TSDB_TABLE_NAME_LEN - strlen(TSMA_RES_STB_POSTFIX)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSMA_NAME_TOO_LONG);
} else if (pTsmaToken->n == 0) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, pTsmaToken->z);
}
return pCxt->errCode == TSDB_CODE_SUCCESS;
}
@ -1935,6 +1937,7 @@ SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, SToken* pTagName, SNode* pVal) {
CHECK_PARSER_STATUS(pCxt);
if (!checkColumnName(pCxt, pTagName)) {
nodesDestroyNode(pVal);
return NULL;
}
SAlterTableStmt* pStmt = (SAlterTableStmt*)nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT);
@ -1946,6 +1949,7 @@ SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, SToken
}
SNode* setAlterSuperTableType(SNode* pStmt) {
if (!pStmt) return NULL;
setNodeType(pStmt, QUERY_NODE_ALTER_SUPER_TABLE_STMT);
return pStmt;
}
@ -2436,6 +2440,7 @@ SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, c
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) {
nodesDestroyNode(pQuery);
return NULL;
}
SCreateTopicStmt* pStmt = (SCreateTopicStmt*)nodesMakeNode(QUERY_NODE_CREATE_TOPIC_STMT);
@ -2721,6 +2726,8 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
CHECK_PARSER_STATUS(pCxt);
if (!checkStreamName(pCxt, pStreamName)) {
nodesDestroyNode(pQuery);
nodesDestroyNode(pOptions);
return NULL;
}
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)nodesMakeNode(QUERY_NODE_CREATE_STREAM_STMT);

View File

@ -2393,7 +2393,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
// no data in the sql string anymore.
if (0 == pTbName->n) {
if (0 != pTbName->type && '\0' != pStmt->pSql[0]) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", pTbName->z);
return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pTbName->z);
}
if (0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) {

View File

@ -103,6 +103,9 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
if (pTableName->n == 0) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "invalid table name");
}
char name[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(name, pTableName->z, pTableName->n);
@ -111,6 +114,8 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
if (dbName == NULL) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (name[0] == '\0')
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, msg4);
code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
if (code != TSDB_CODE_SUCCESS) {
@ -732,6 +737,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
}
} else {
bool hasTs = false;
for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j];
@ -743,6 +749,10 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end;
}
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
hasTs = true;
}
int8_t* offset = pStart;
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
pStart += numOfRows * sizeof(int32_t);
@ -768,6 +778,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
}
if(!hasTs){
if (errstr != NULL) snprintf(errstr, errstrLen, "timestamp column(primary key) not found in raw data");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
for (int c = 0; c < boundInfo->numOfBound; ++c) {
if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);

View File

@ -370,12 +370,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) {
QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
QW_ERR_RET(TSDB_CODE_GRANT_VIEW_EXPIRED);
}
if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) {
QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
QW_ERR_RET(TSDB_CODE_GRANT_AUDIT_EXPIRED);
}
}
}

View File

@ -1150,10 +1150,13 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
taskDbAddRef(pTaskDb);
int64_t chkpId = pTaskDb->chkpId;
taskDbRefChkp(pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, chkpId);
taskDbRemoveRef(pTaskDb);
if (code != 0) {
taskDbUnRefChkp(pTaskDb, chkpId);
}
taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
taskDbRemoveRef(pTaskDb);
SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
@ -1177,14 +1180,15 @@ int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL) {
STaskDbWrapper** pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL || *pTaskDb == NULL) {
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
memset(buf, 0, sizeof(buf));
continue;
}
memset(buf, 0, sizeof(buf));
taskDbUnRefChkp(pTaskDb, pSnap->chkpId);
taskDbUnRefChkp(*pTaskDb, pSnap->chkpId);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return 0;
@ -1985,7 +1989,8 @@ void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) {
int32_t size = taosArrayGetSize(pTaskDb->chkpInUse);
for (int i = 0; i < size; i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
if (*p == chkp) {
taosArrayRemove(pTaskDb->chkpInUse, i);

View File

@ -494,7 +494,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
if (pDataBlock->info.parTbName[0]) {
if(pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) &&
groupId != 0){
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);

View File

@ -184,16 +184,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code;
}
static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
int32_t code = TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
if (code != TSDB_CODE_SUCCESS) { // should not have error code
stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
}
} else {
taosArrayDestroy(pRes);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
}
return code;
}
@ -268,6 +268,17 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
// output queue is full, idle for 5 sec.
if (streamQueueIsFull(pTask->outputq.queue)) {
stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -284,19 +295,13 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
}
// dispatch the generated results
/*int32_t code = */handleResultBlocks(pTask, pRes, size);
int64_t el = taosGetTimestampMs() - st;
// downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
}
/*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size);
if (finished) {
return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
}
int64_t el = taosGetTimestampMs() - st;
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->info.fillHistory, el / 1000.0);
@ -558,7 +563,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (streamQueueIsFull(pTask->outputq.queue)) {
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
streamTaskSetIdleInfo(pTask, 500);
streamTaskSetIdleInfo(pTask, 1000);
return 0;
}

View File

@ -381,7 +381,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
}
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,

View File

@ -144,7 +144,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
@ -333,7 +333,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(handle->pDbSnapSet);
}
streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
streamDestroyTaskDbSnapInfo(handle->pMeta, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);

View File

@ -759,7 +759,7 @@ void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStat
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
}
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }

View File

@ -2741,7 +2741,7 @@ int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut,
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (lvl != 0 && lossyFloat) {
if (l2 == L2_TSZ && lvl != 0 && lossyFloat) {
return tsCompressFloatLossyImp(pIn, nEle, pOut);
}
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
@ -2760,7 +2760,7 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) {
DEFINE_VAR(cmprAlg)
if (lvl != 0 && lossyDouble) {
if (l2 == L2_TSZ && lvl != 0 && lossyDouble) {
// lossy mode
return tsCompressDoubleLossyImp(pIn, nEle, pOut);
}

View File

@ -259,7 +259,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY, "Inconsistent encryption key")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ENCRYPT_KEY, "The cluster has not been set properly for database encryption")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED, "The database encryption function grant expired")
// mnode-node
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")
@ -499,9 +498,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LACK_OF_BASIC, "Lack of basic functio
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LAST_ACTIVE_NOT_FOUND, "The historial active code does not match")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mismatch with active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expiration time of optional grant item is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of views has reached the licensed upper limit")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_BASIC_EXPIRED, "License expired for basic functions")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_EXPIRED, "License expired for stream function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED, "License expired for subscription function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_EXPIRED, "License expired for view function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_EXPIRED, "License expired for audit function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_EXPIRED, "License expired for CSV function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED, "License expired for multi-tier storage function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED, "License expired for object storage function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED,"License expired for dual-replica HA function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED, "License expired for database encryption function")
// sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")

View File

@ -22,6 +22,7 @@ class TDTestCase(TBase):
self.child_table_num = 1
self.insert_round_num = 700
self.row_num_per_round = 15
self.start_ts = 1704082431000
def prepare_data(self):
# database
@ -39,7 +40,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_binary%s values" % (i+1)
for k in range(self.row_num_per_round):
sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), 'a' * self.max_column_length)
sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), 'a' * self.max_column_length)
tdSql.execute(sql)
tdLog.info(f"Insert {self.row_num_per_round} rows data into ct_binary{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@ -63,7 +64,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_varchar%s values" % (i+1)
for k in range(self.row_num_per_round):
sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), 'b' * self.max_column_length)
sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), 'b' * self.max_column_length)
tdSql.execute(sql)
tdLog.info(f"Insert {self.row_num_per_round} rows data into ct_varchar{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@ -98,7 +99,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_nchar%s values" % (i+1)
for k in range(self.row_num_per_round):
sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), column)
sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), column)
tdSql.execute(sql)
tdLog.info(f"Insert {self.row_num_per_round} rows data into ct_nchar{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@ -124,7 +125,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_varbinary%s values" % (i+1)
for k in range(row_num_per_round):
sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), '\\x' + column)
sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), '\\x' + column)
tdSql.execute(sql)
tdLog.info(f"Insert {row_num_per_round} rows data into ct_varbinary{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")
@ -153,7 +154,7 @@ class TDTestCase(TBase):
for j in range(self.insert_round_num):
sql = "insert into ct_json_tag%s values" % (i+1)
for k in range(row_num_per_round):
sql += "(now+%ss, '%s')," % (str(j * 10 + k + 1), '\\x' + column)
sql += "(%s, '%s')," % (str(self.start_ts + (j * self.insert_round_num + k * self.row_num_per_round + 1)), '\\x' + column)
tdSql.execute(sql)
tdLog.info(f"Insert {row_num_per_round} rows data into ct_json_tag{i+1} {j+1} times successfully")
tdSql.execute("flush database db;")

View File

@ -118,6 +118,22 @@ class TDTestCase(TBase):
tdSql.query(sql)
tdSql.checkRows(5)
# verification for TD-30030
tdSql.execute("create table t100 (ts timestamp, pk varchar(20) primary key, c1 varchar(100)) tags (id int)")
tdSql.execute("insert into ct1 using t100 tags(1) values('2024-05-17 14:58:52.902', 'a1', '100')")
tdSql.execute("insert into ct1 using t100 tags(1) values('2024-05-17 14:58:52.902', 'a2', '200')")
tdSql.execute("insert into ct1 using t100 tags(1) values('2024-05-17 14:58:52.902', 'a3', '300')")
tdSql.execute("insert into ct2 using t100 tags(2) values('2024-05-17 14:58:52.902', 'a2', '200')")
tdSql.execute("create view v100 as select * from t100")
tdSql.execute("create view v200 as select * from ct1")
tdSql.error("show tags from v100", expectErrInfo="Tags can only applied to super table and child table")
tdSql.error("show tags from v200", expectErrInfo="Tags can only applied to super table and child table")
tdSql.execute("create table t200 (ts timestamp, pk varchar(20) primary key, c1 varchar(100))")
tdSql.error("show tags from t200", expectErrInfo="Tags can only applied to super table and child table")
def checkShow(self):
# not support
sql = "show accounts;"

View File

@ -323,6 +323,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/empty_identifier.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py

View File

@ -315,19 +315,22 @@ function run_thread() {
fi
if [ -n "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m"
local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build"
# if [ $ent -ne 0 ]; then
# remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
# fi
mkdir "$build_dir" 2>/dev/null
if [ $? -eq 0 ]; then
# scp build binary
cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
fi
fi
# scp build binary and unit test log
local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build"
local remote_unit_test_log_dir="${workdirs[index]}/${DEBUGPATH}/Testing/Temporary/"
mkdir "$build_dir" 2>/dev/null
if [ $? -eq 0 ]; then
cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
cmd="$scpcmd:${remote_unit_test_log_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
fi
# get remote sim dir
local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no"
local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"

View File

@ -0,0 +1,180 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from datetime import datetime
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
COMPARE_DATA = 0
COMPARE_LEN = 1
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, paraDict):
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s"%(sqlString))
tsql.execute(sqlString)
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
if sql != pre_insert:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'test',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'meters',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},
{'type': 'BIGINT', 'count':1},
{'type': 'FLOAT', 'count':1},
{'type': 'DOUBLE', 'count':1},
{'type': 'smallint', 'count':1},
{'type': 'tinyint', 'count':1},
{'type': 'bool', 'count':1},
{'type': 'binary', 'len':10, 'count':1},
{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 't',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def run(self):
self.prepareTestEnv()
self.test_empty_identifier()
def execute_sql_and_expect_err(self, sql: str, err: int):
tdSql.error(sql, err)
def test_empty_identifier(self):
## invalid identifier
sqls = [
'show create table ``',
'show create table test.``',
'create table `` (ts timestamp, c1 int)',
'drop table ``',
'alter table `` add column c2 int',
'select * from ``',
'alter table meters add column `` int',
'alter table meters drop column ``',
'alter table t0 set tag `` = ""',
'alter stable meters add tag `` int',
'alter stable meters rename tag cc ``',
'alter stable meters drop tag ``',
'insert into `` select * from t0',
'insert into t100 using `` tags('', '') values(1,1,1)',
'create view `` as select count(*) from meters interval(10s)',
'create view ``.view1 as select count(*) from meters'
'create tsma `` on meters function(count(c1)) interval(1m)',
'create tsma tsma1 on `` function(count(c1)) interval(1m)',
'create stream `` into st1 as select count(*) from meters interval(10s)',
'create stream stream1 into `` as select count(*) from meters interval(10s)',
'create stream stream1 into st1 as select count(*) from `` interval(10s)',
'create stream stream1 trigger max_delay 100s into st1 as select count(*) from `` interval(10s)',
'drop view ``',
'drop tsma ``',
'drop view ``.st1',
'create topic `` as select count(*) from meters interval(10s)',
'drop topic ``',
'insert into `` values(1,1,1)',
]
for sql in sqls:
self.execute_sql_and_expect_err(sql, -2147473897)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -237,6 +237,9 @@ class TDTestCase:
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and t2=0;")
tdSql.checkRows(1)
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and t2=0 having count(*) > 10;")
tdSql.checkRows(0)
tdSql.query(f"select tbname, count(*) from {self.dbname}.{self.stable} partition by tbname event_window start with c1 >= 0 end with c2 = 9 and _rowts>0;")
tdSql.checkRows(nonempty_tb_num)

View File

@ -39,7 +39,7 @@ class TDTestCase:
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &")
time.sleep(10)
tdSql.query("use test")
tdSql.execute("use test", queryTimes=100)
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream and insert data ok========")
time.sleep(15)
@ -66,7 +66,7 @@ class TDTestCase:
os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1")
# create stream
tdSql.execute("use db")
tdSql.execute("use db", queryTimes=100)
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
time.sleep(5)

View File

@ -89,8 +89,8 @@ else
export LD_PRELOAD="$(realpath "$(gcc -print-file-name=libasan.so)") $(realpath "$(gcc -print-file-name=libstdc++.so)")"
echo "Preload AsanSo:" $?
$* -a 2>$AsanFile
$* -a 2> $AsanFile
cat $AsanFile
unset LD_PRELOAD
for ((i = 1; i <= 20; i++)); do
AsanFileLen=$(cat $AsanFile | wc -l)

View File

@ -28,7 +28,7 @@ extern bool simExecSuccess;
int32_t simInitCfg() {
taosCreateLog("simlog", 1, configDir, NULL, NULL, NULL, NULL, 1);
taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1, true);
taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1);
SConfig *pCfg = taosGetCfg();
tstrncpy(simScriptDir, cfgGetItem(pCfg, "scriptDir")->str, PATH_MAX);