diff --git a/docs/en/12-taos-sql/24-show.md b/docs/en/12-taos-sql/24-show.md
index e2aff7a878..1f340cab30 100644
--- a/docs/en/12-taos-sql/24-show.md
+++ b/docs/en/12-taos-sql/24-show.md
@@ -189,7 +189,7 @@ show table distributed d0\G;
*************************** 1.row ***************************
-_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
+_block_dist: Total_Blocks=[5] Total_Size=[93.65 KB] Average_size=[18.73 KB] Compression_Ratio=[23.98 %]
Total_Blocks : Table `d0` contains total 5 blocks
diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
index 7551ad46b1..d54bb60e93 100644
--- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
+++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs
@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
- format!("CREATE DATABASE `{db}`"),
+ format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),
diff --git a/docs/zh/12-taos-sql/24-show.md b/docs/zh/12-taos-sql/24-show.md
index c85efa2376..ab29a1ee50 100644
--- a/docs/zh/12-taos-sql/24-show.md
+++ b/docs/zh/12-taos-sql/24-show.md
@@ -189,7 +189,7 @@ SHOW TABLE DISTRIBUTED table_name;
*************************** 1.row ***************************
-_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
+_block_dist: Total_Blocks=[5] Total_Size=[93.65 KB] Average_size=[18.73 KB] Compression_Ratio=[23.98 %]
Total_Blocks: 表 d0 占用的 block 个数为 5 个
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 1fb00e743f..b7e6c42e3b 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
+void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
+
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
void qStreamSetOpen(qTaskInfo_t tinfo);
diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index d3e2bbb1be..46dc179295 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -132,7 +132,7 @@ typedef struct {
} SWalRef;
typedef struct {
- int8_t scanUncommited;
+// int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta;
int8_t enableRef;
diff --git a/packaging/docker/bin/entrypoint.sh b/packaging/docker/bin/entrypoint.sh
index f2811de7bd..a60254d7ef 100755
--- a/packaging/docker/bin/entrypoint.sh
+++ b/packaging/docker/bin/entrypoint.sh
@@ -55,7 +55,7 @@ else
exit $?
fi
while true; do
- es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check)
+ es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check | grep "^[0-9]*:")
echo ${es}
if [ "${es%%:*}" -eq 2 ]; then
echo "execute create dnode"
diff --git a/packaging/docker/bin/taos-check b/packaging/docker/bin/taos-check
index 5dc06b6018..349187da9b 100755
--- a/packaging/docker/bin/taos-check
+++ b/packaging/docker/bin/taos-check
@@ -1,5 +1,5 @@
#!/bin/sh
-es=$(taos --check)
+es=$(taos --check | grep "^[0-9]*:")
code=${es%%:*}
if [ "$code" -ne "0" ] && [ "$code" -ne "4" ]; then
exit 0
diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c
index 19d08ad66d..30c0008f3d 100644
--- a/source/client/src/clientSml.c
+++ b/source/client/src/clientSml.c
@@ -1351,7 +1351,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
}
taosArrayPush(info->pRequest->tableList, &pName);
- tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1);
+ strcpy(pName.tname, tableData->childTableName);
SRequestConnInfo conn = {0};
conn.pTrans = info->taos->pAppInfo->pTransporter;
@@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
code = smlModifyDBSchemas(info);
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
- || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break;
+ || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
+ break;
+ }
taosMsleep(100);
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index aa35b298e6..b85dfa80b0 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -84,7 +84,7 @@ bool tsMonitorComp = false;
// telem
bool tsEnableTelem = true;
int32_t tsTelemInterval = 43200;
-char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
+char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
uint16_t tsTelemPort = 80;
char *tsTelemUri = "/report";
diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c
index f0e020edfe..989bff3984 100644
--- a/source/dnode/mgmt/exe/dmMain.c
+++ b/source/dnode/mgmt/exe/dmMain.c
@@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) {
}
void dmLogCrash(int signum, void *sigInfo, void *context) {
- taosIgnSignal(SIGTERM);
- taosIgnSignal(SIGHUP);
- taosIgnSignal(SIGINT);
- taosIgnSignal(SIGBREAK);
-
-#ifndef WINDOWS
- taosIgnSignal(SIGBUS);
-#endif
- taosIgnSignal(SIGABRT);
- taosIgnSignal(SIGFPE);
- taosIgnSignal(SIGSEGV);
-
char *pMsg = NULL;
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;
diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c
index 50e502f4ab..ff6a2f460a 100644
--- a/source/dnode/mnode/impl/src/mndProfile.c
+++ b/source/dnode/mnode/impl/src/mndProfile.c
@@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
pDb = mndAcquireDb(pMnode, db);
if (pDb == NULL) {
- terrno = TSDB_CODE_MND_INVALID_DB;
- mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
- terrstr());
- goto _OVER;
+ if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) &&
+ (0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) {
+ terrno = TSDB_CODE_MND_INVALID_DB;
+ mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
+ terrstr());
+ goto _OVER;
+ }
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index a73c08e69a..8b708c3e0f 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -932,7 +932,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return -1;
}
- if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
+ if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@@ -1163,8 +1163,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
-
- if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){
+
+ if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@@ -1476,7 +1476,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return -1;
}
- if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){
+ if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c
index f4d6e27dea..e62102fa77 100644
--- a/source/dnode/mnode/impl/src/mndSubscribe.c
+++ b/source/dnode/mnode/impl/src/mndSubscribe.c
@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) {
- if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
- terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
- return -1;
- }
+// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
+// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
+// return -1;
+// }
void *buf;
int32_t tlen;
@@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
}
}
+static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
+ for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
+ SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
+ SMqRebOutputVg outputVg = {
+ .oldConsumerId = pConsumerEp->consumerId,
+ .newConsumerId = pConsumerEp->consumerId,
+ .pVgEp = pVgEp,
+ };
+ taosArrayPush(pOutput->rebVgs, &outputVg);
+ }
+}
+
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
int32_t imbConsumerNum) {
const char *pSubKey = pOutput->pSub->key;
@@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) {
- if (consumerVgNum == minVgCnt + 1) {
- imbCnt++;
- continue;
- } else {
- // pop until equal minVg + 1
- while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
- SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
- SMqRebOutputVg outputVg = {
- .oldConsumerId = pConsumerEp->consumerId,
- .newConsumerId = -1,
- .pVgEp = pVgEp,
- };
- taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
- mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
- pConsumerEp->consumerId);
- }
- imbCnt++;
+ // pop until equal minVg + 1
+ while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
+ SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
+ SMqRebOutputVg outputVg = {
+ .oldConsumerId = pConsumerEp->consumerId,
+ .newConsumerId = -1,
+ .pVgEp = pVgEp,
+ };
+ taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
+ mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
+ pConsumerEp->consumerId);
}
+ imbCnt++;
} else {
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
@@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
}
}
}
+ putNoTransferToOutput(pOutput, pConsumerEp);
}
}
diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c
index 523753d7c6..2a0d753722 100644
--- a/source/dnode/mnode/impl/src/mndUser.c
+++ b/source/dnode/mnode/impl/src/mndUser.c
@@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER);
SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER)
- useDb = taosHashIterate(pUser->writeTbs, useDb);
+ useDb = taosHashIterate(pUser->useDbs, useDb);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h
index 30b2fb74ca..e431ca4a01 100644
--- a/source/dnode/vnode/src/inc/tq.h
+++ b/source/dnode/vnode/src/inc/tq.h
@@ -101,6 +101,7 @@ typedef struct {
STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
SRpcMsg* msg;
+ int32_t noDataPollCnt;
} STqHandle;
typedef struct {
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 6a52d0fc39..311d637be8 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
+ int ret = 0;
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
@@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
- taosMemoryFree(req.qmsg);
- return 0;
+ goto end;
}
STqHandle tqHandle = {0};
@@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
if (pRef == NULL) {
- taosMemoryFree(req.qmsg);
- return -1;
+ ret = -1;
+ goto end;
}
int64_t ver = pRef->refVer;
@@ -534,49 +534,42 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
pHandle->consumerId, oldConsumerId);
- if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
- taosMemoryFree(req.qmsg);
- return -1;
- }
+ ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
+ goto end;
} else {
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
- atomic_store_32(&pHandle->epoch, -1);
atomic_add_fetch_32(&pHandle->epoch, 1);
- taosMemoryFree(req.qmsg);
- return tqMetaSaveHandle(pTq, req.subKey, pHandle);
+
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
-
- // kill executing task
- qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
- if (pTaskInfo != NULL) {
- qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
- }
-
- taosWLockLatch(&pTq->lock);
- atomic_store_32(&pHandle->epoch, 0);
-
- // remove if it has been register in the push manager, and return one empty block to consumer
- tqUnregisterPushHandle(pTq, pHandle);
-
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
-
- if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
- qStreamCloseTsdbReader(pTaskInfo);
- }
-
- taosWUnLockLatch(&pTq->lock);
- if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
- taosMemoryFree(req.qmsg);
- return -1;
- }
+ atomic_store_32(&pHandle->epoch, 0);
}
+ // kill executing task
+ qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
+ if (pTaskInfo != NULL) {
+ qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
+ }
+
+ taosWLockLatch(&pTq->lock);
+ // remove if it has been register in the push manager, and return one empty block to consumer
+ tqUnregisterPushHandle(pTq, pHandle);
+
+
+ if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
+ qStreamCloseTsdbReader(pTaskInfo);
+ }
+
+ taosWUnLockLatch(&pTq->lock);
+ ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
+ goto end;
}
+end:
taosMemoryFree(req.qmsg);
- return 0;
+ return ret;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c
index 1e45f578f6..0bb33b1215 100644
--- a/source/dnode/vnode/src/tq/tqRestore.c
+++ b/source/dnode/vnode/src/tq/tqRestore.c
@@ -109,6 +109,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
// seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
+ SWal *pWal = pTask->exec.pWalReader->pWal;
+ if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) {
+ pTask->chkInfo.currentVer = pWal->vers.firstVer;
+ code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
+ if (code != TSDB_CODE_SUCCESS) {
+ streamMetaReleaseTask(pStreamMeta, pTask);
+ continue;
+ }
+ }
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c
index d83345ad59..94803ef438 100644
--- a/source/dnode/vnode/src/tq/tqUtil.c
+++ b/source/dnode/vnode/src/tq/tqUtil.c
@@ -16,6 +16,7 @@
#include "tq.h"
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
+#define NO_POLL_CNT 5
static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp);
@@ -165,12 +166,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
+ int code = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
+ qTaskInfo_t task = pHandle->execHandle.task;
+ if(qTaskIsExecuting(task)){
+ code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
+ tDeleteSMqDataRsp(&dataRsp);
+ return code;
+ }
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
- int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
+ code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
goto end;
}
@@ -178,12 +186,18 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
- // lock
- taosWLockLatch(&pTq->lock);
- code = tqRegisterPushHandle(pTq, pHandle, pMsg);
- taosWUnLockLatch(&pTq->lock);
- tDeleteSMqDataRsp(&dataRsp);
- return code;
+ if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data
+ pHandle->noDataPollCnt = 0;
+ // lock
+ taosWLockLatch(&pTq->lock);
+ code = tqRegisterPushHandle(pTq, pHandle, pMsg);
+ taosWUnLockLatch(&pTq->lock);
+ tDeleteSMqDataRsp(&dataRsp);
+ return code;
+ }
+ else{
+ pHandle->noDataPollCnt++;
+ }
}
@@ -246,6 +260,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) {
+ verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index d0a0ea7947..2fc2b8cdd6 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -4851,7 +4851,11 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
if (pReader->flag == READER_STATUS_SUSPEND) {
- tsdbReaderResume(pReader);
+ code = tsdbReaderResume(pReader);
+ if (code != TSDB_CODE_SUCCESS) {
+ tsdbReleaseReader(pReader);
+ return code;
+ }
}
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
@@ -5124,11 +5128,17 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
}
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
+ int32_t code = TSDB_CODE_SUCCESS;
+
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
tsdbAcquireReader(pReader);
if (pReader->flag == READER_STATUS_SUSPEND) {
- tsdbReaderResume(pReader);
+ code = tsdbReaderResume(pReader);
+ if (code != TSDB_CODE_SUCCESS) {
+ tsdbReleaseReader(pReader);
+ return code;
+ }
}
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
@@ -5163,8 +5173,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
- int32_t code = 0;
-
// no data in files, let's try buffer in memory
if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false;
@@ -5209,7 +5217,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
// find the start data block in file
tsdbAcquireReader(pReader);
if (pReader->flag == READER_STATUS_SUSPEND) {
- tsdbReaderResume(pReader);
+ code = tsdbReaderResume(pReader);
+ if (code != TSDB_CODE_SUCCESS) {
+ tsdbReleaseReader(pReader);
+ return code;
+ }
}
SReaderStatus* pStatus = &pReader->status;
@@ -5277,12 +5289,17 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
}
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
+ int32_t code = TSDB_CODE_SUCCESS;
int64_t rows = 0;
SReaderStatus* pStatus = &pReader->status;
tsdbAcquireReader(pReader);
if (pReader->flag == READER_STATUS_SUSPEND) {
- tsdbReaderResume(pReader);
+ code = tsdbReaderResume(pReader);
+ if (code != TSDB_CODE_SUCCESS) {
+ tsdbReleaseReader(pReader);
+ return code;
+ }
}
int32_t iter = 0;
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 2d5830e4a9..8bbbd3524d 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED;
}
+void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
+ // if offset version is small than first version , let's seek to first version
+ int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
+ if (pOffset->version + 1 < firstVer){
+ pOffset->version = firstVer - 1;
+ }
+}
+
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
@@ -1084,12 +1092,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
- // let's seek to the next version in wal file
- int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
- if (pOffset->version + 1 < firstVer){
- pOffset->version = firstVer - 1;
- }
-
+ verifyOffset(pInfo->tqReader->pWalReader, pOffset);
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1;
diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c
index f525f6728c..62ab2d9df2 100644
--- a/source/libs/executor/src/executorInt.c
+++ b/source/libs/executor/src/executorInt.c
@@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
- SGroupResInfo* pGroupResInfo);
+ SGroupResInfo* pGroupResInfo, int32_t threshold);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL;
@@ -776,7 +776,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
}
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
- SGroupResInfo* pGroupResInfo) {
+ SGroupResInfo* pGroupResInfo, int32_t threshold) {
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
@@ -825,6 +825,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
+ if (pBlock->info.rows >= threshold) {
+ break;
+ }
}
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
@@ -850,7 +853,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
- doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
+ doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
@@ -877,10 +880,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// clear the existed group id
pBlock->info.id.groupId = 0;
if (!pbInfo->mergeResultBlock) {
- doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
+ doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
} else {
while (hasRemainResults(pGroupResInfo)) {
- doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
+ doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
}
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index a8ecd9b0a2..4c019b3e71 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -5572,7 +5572,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
- "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]",
+ "Total_Blocks=[%d] Total_Size=[%.2f KB] Average_size=[%.2f KB] Compression_Ratio=[%.2f %c]",
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
varDataSetLen(st, len);
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index b598fffbc6..f4c86d4849 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -6118,17 +6118,50 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) {
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
}
+static bool hasJsonTypeProjection(SSelectStmt* pSelect) {
+ SNode* pProj = NULL;
+ FOREACH(pProj, pSelect->pProjectionList) {
+ if (TSDB_DATA_TYPE_JSON == ((SExprNode*)pProj)->resType.type) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static EDealRes hasColumnOrPseudoColumn(SNode* pNode, void* pContext) {
+ if (QUERY_NODE_COLUMN == nodeType(pNode)) {
+ *(bool*)pContext = true;
+ return DEAL_RES_END;
+ }
+ if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)) {
+ *(bool*)pContext = true;
+ return DEAL_RES_END;
+ }
+ return DEAL_RES_CONTINUE;
+}
+
+static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) {
+ bool hasColumn = false;
+ nodesWalkExprPostOrder(pNode, hasColumnOrPseudoColumn, &hasColumn);
+ return hasColumn;
+}
+
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
- crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
+ crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"SUBTABLE expression must be of VARCHAR type");
}
+ if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasColumnOrPseudoColumn(pSelect->pSubtable)) {
+ return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
+ "SUBTABLE expression must not has column when no partition by clause");
+ }
+
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"The trigger mode of non window query can only be AT_ONCE");
@@ -8232,6 +8265,11 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
}
+ if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
+ (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
+ return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
+ }
+
if (TSDB_MAX_COLUMNS == pTableMeta->tableInfo.numOfColumns) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
}
diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp
index b7ca944ebb..6a08193a39 100644
--- a/source/libs/parser/test/parInitialCTest.cpp
+++ b/source/libs/parser/test/parInitialCTest.cpp
@@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)",
TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
+ run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME",
+ TSDB_CODE_PAR_INVALID_STREAM_QUERY);
+ run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) "
+ "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY);
}
/*
diff --git a/source/libs/scalar/test/CMakeLists.txt b/source/libs/scalar/test/CMakeLists.txt
index 32f5e098c5..caaf86264c 100644
--- a/source/libs/scalar/test/CMakeLists.txt
+++ b/source/libs/scalar/test/CMakeLists.txt
@@ -1,4 +1,4 @@
enable_testing()
-#add_subdirectory(filter)
+add_subdirectory(filter)
add_subdirectory(scalar)
diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp
index b59e89fe0d..51ee9b6570 100644
--- a/source/libs/scalar/test/filter/filterTests.cpp
+++ b/source/libs/scalar/test/filter/filterTests.cpp
@@ -33,6 +33,7 @@
#include "os.h"
#include "filter.h"
+#include "filterInt.h"
#include "nodes.h"
#include "scalar.h"
#include "stub.h"
@@ -344,6 +345,7 @@ TEST(timerangeTest, greater_and_lower_not_strict) {
nodesDestroyNode(logicNode1);
}
+#if 0
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5] = {1, 2, 3, 4, 5};
@@ -1337,6 +1339,127 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
nodesDestroyNode(logicNode1);
blockDataDestroy(src);
}
+#endif
+
+template
+int32_t compareSignedWithUnsigned(SignedT l, UnsignedT r) {
+ if (l < 0) return -1;
+ auto l_uint64 = static_cast(l);
+ auto r_uint64 = static_cast(r);
+ if (l_uint64 < r_uint64) return -1;
+ if (l_uint64 > r_uint64) return 1;
+ return 0;
+}
+
+template
+int32_t compareUnsignedWithSigned(UnsignedT l, SignedT r) {
+ if (r < 0) return 1;
+ auto l_uint64 = static_cast(l);
+ auto r_uint64 = static_cast(r);
+ if (l_uint64 < r_uint64) return -1;
+ if (l_uint64 > r_uint64) return 1;
+ return 0;
+}
+
+template
+void doCompareWithValueRange_SignedWithUnsigned(__compar_fn_t fp) {
+ int32_t signedMin = -10, signedMax = 10;
+ int32_t unsignedMin = 0, unsignedMax = 10;
+ for (SignedT l = signedMin; l <= signedMax; ++l) {
+ for (UnsignedT r = unsignedMin; r <= unsignedMax; ++r) {
+ ASSERT_EQ(fp(&l, &r), compareSignedWithUnsigned(l, r));
+ }
+ }
+}
+
+template
+void doCompareWithValueRange_UnsignedWithSigned(__compar_fn_t fp) {
+ int32_t signedMin = -10, signedMax = 10;
+ int32_t unsignedMin = 0, unsignedMax = 10;
+ for (UnsignedT l = unsignedMin; l <= unsignedMax; ++l) {
+ for (SignedT r = signedMin; r <= signedMax; ++r) {
+ ASSERT_EQ(fp(&l, &r), compareUnsignedWithSigned(l, r));
+ }
+ }
+}
+
+template
+void doCompareWithValueRange_OnlyLeftType(__compar_fn_t fp, int32_t rType) {
+ switch (rType) {
+ case TSDB_DATA_TYPE_UTINYINT:
+ doCompareWithValueRange_SignedWithUnsigned(fp);
+ break;
+ case TSDB_DATA_TYPE_USMALLINT:
+ doCompareWithValueRange_SignedWithUnsigned(fp);
+ break;
+ case TSDB_DATA_TYPE_UINT:
+ doCompareWithValueRange_SignedWithUnsigned(fp);
+ break;
+ case TSDB_DATA_TYPE_UBIGINT:
+ doCompareWithValueRange_SignedWithUnsigned(fp);
+ break;
+ case TSDB_DATA_TYPE_TINYINT:
+ doCompareWithValueRange_UnsignedWithSigned(fp);
+ break;
+ case TSDB_DATA_TYPE_SMALLINT:
+ doCompareWithValueRange_UnsignedWithSigned(fp);
+ break;
+ case TSDB_DATA_TYPE_INT:
+ doCompareWithValueRange_UnsignedWithSigned(fp);
+ break;
+ case TSDB_DATA_TYPE_BIGINT:
+ doCompareWithValueRange_UnsignedWithSigned(fp);
+ break;
+ default:
+ FAIL();
+ }
+}
+
+void doCompare(const std::vector &lTypes, const std::vector &rTypes, int32_t oper) {
+ for (int i = 0; i < lTypes.size(); ++i) {
+ for (int j = 0; j < rTypes.size(); ++j) {
+ auto fp = filterGetCompFuncEx(lTypes[i], rTypes[j], oper);
+ switch (lTypes[i]) {
+ case TSDB_DATA_TYPE_TINYINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_SMALLINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_INT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_BIGINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_UTINYINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_USMALLINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_UINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ case TSDB_DATA_TYPE_UBIGINT:
+ doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]);
+ break;
+ default:
+ FAIL();
+ }
+ }
+ }
+}
+
+TEST(dataCompareTest, signed_and_unsigned_int) {
+ std::vector lType = {TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_INT,
+ TSDB_DATA_TYPE_BIGINT};
+ std::vector rType = {TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_UINT,
+ TSDB_DATA_TYPE_UBIGINT};
+
+ doCompare(lType, rType, OP_TYPE_GREATER_THAN);
+ doCompare(rType, lType, OP_TYPE_GREATER_THAN);
+}
int main(int argc, char **argv) {
taosSeedRand(taosGetTimestampSec());
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index aefe7885f9..af54904c43 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -188,6 +188,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return -1;
}
+ if (streamMetaCommit(pMeta) < 0) {
+ tFreeStreamTask(pTask);
+ return -1;
+ }
+
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
return 0;
diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c
index f9f14c2e00..3506d477d3 100644
--- a/source/libs/sync/src/syncRespMgr.c
+++ b/source/libs/sync/src/syncRespMgr.c
@@ -171,6 +171,8 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
}
void syncRespCleanRsp(SSyncRespMgr *pObj) {
+ if (pObj == NULL) return;
+
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean all resp", pNode->vgId);
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index c23d6d0a1f..ea35f1cfe5 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -587,12 +587,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool;
- SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
+ SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
STrans* pTranInst = pThrd->pTransInst;
if (plist == NULL) {
SConnList list = {0};
- taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
- plist = taosHashGet(pool, key, strlen(key));
+ taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
+ plist = taosHashGet(pool, key, strlen(key) + 1);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@@ -627,11 +627,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
void* pool = pThrd->pool;
STrans* pTransInst = pThrd->pTransInst;
- SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
+ SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
if (plist == NULL) {
SConnList list = {0};
- taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
- plist = taosHashGet(pool, key, strlen(key));
+ taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
+ plist = taosHashGet(pool, key, strlen(key) + 1);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@@ -717,7 +717,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs(conn, false);
if (conn->list == NULL) {
- conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
+ conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1);
}
SConnList* pList = conn->list;
@@ -822,7 +822,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return;
}
if (nread < 0) {
- tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
+ tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
+ T_REF_VAL_GET(conn));
conn->broken = true;
cliHandleExcept(conn);
}
@@ -875,8 +876,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
connList->list->numOfConn--;
connList->size--;
} else {
- SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
- connList->list->numOfConn--;
+ SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1);
+ if (connList != NULL) connList->list->numOfConn--;
}
conn->list = NULL;
pThrd->newConnCount--;
@@ -1269,7 +1270,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
- SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
+ SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1);
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
int32_t elapse = cTimestamp - item->timestamp;
@@ -1281,7 +1282,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
- taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
+ taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem));
}
}
} else {
@@ -1459,7 +1460,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
}
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
uint32_t addr = 0;
- uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
+ uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) {
@@ -1468,7 +1469,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
return addr;
}
- taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
+ taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
} else {
addr = *v;
}
diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c
index 28fb474972..269c7ecf9b 100644
--- a/source/libs/transport/src/transSvr.c
+++ b/source/libs/transport/src/transSvr.c
@@ -314,7 +314,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return;
}
- tWarn("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
+ tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
if (nread < 0) {
conn->broken = true;
if (conn->status == ConnAcquire) {
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index cda7e35b0f..a12f8051ba 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) {
wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
}
+int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
+ int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
+ int32_t fileIdx = -1;
+ int32_t lastCloseTs = 0;
+ char fnameStr[WAL_FILE_LEN] = {0};
+
+ while (++fileIdx < sz - 1) {
+ SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
+ if (pFileInfo->closeTs != -1) {
+ lastCloseTs = pFileInfo->closeTs;
+ continue;
+ }
+
+ walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
+ int32_t mtime = 0;
+ if (taosStatFile(fnameStr, NULL, &mtime) < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
+ return -1;
+ }
+
+ if (updateMeta != NULL) *updateMeta = true;
+ if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs;
+ pFileInfo->closeTs = mtime;
+ lastCloseTs = pFileInfo->closeTs;
+ }
+
+ return 0;
+}
+
bool walLogEntriesComplete(const SWal* pWal) {
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
bool complete = true;
@@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
wError("failed to scan wal last ver since %s", terrstr());
return -1;
}
- // remove the empty wal log, and its idx
- wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
- taosRemoveFile(fnameStr);
- walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
- wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
- taosRemoveFile(fnameStr);
- // remove its meta entry
- taosArrayRemove(pWal->fileInfoSet, fileIdx);
- continue;
+ // empty log file
+ lastVer = pFileInfo->firstVer - 1;
}
// update lastVer
@@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
(void)walAlignVersions(pWal);
+ // repair ts of files
+ if (walRepairLogFileTs(pWal, &updateMeta) < 0) {
+ return -1;
+ }
+
// update meta file
if (updateMeta) {
(void)walSaveMeta(pWal);
diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c
index 4cc43a19a0..37d97b35a6 100644
--- a/source/libs/wal/src/walRead.c
+++ b/source/libs/wal/src/walRead.c
@@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
if (cond) {
pReader->cond = *cond;
} else {
- pReader->cond.scanUncommited = 0;
+// pReader->cond.scanUncommited = 0;
pReader->cond.scanNotApplied = 0;
pReader->cond.scanMeta = 0;
pReader->cond.enableRef = 0;
@@ -74,11 +74,15 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
- int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
- endVer = TMIN(appliedVer, endVer);
+ if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
+ wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
+// taosMsleep(10);
+ }
+// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
+ int64_t endVer = TMIN(appliedVer, committedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
- ", applied index:%" PRId64 ", end index:%" PRId64,
+ ", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 848de4f36d..9b7b3dfd50 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) {
if (ver == -1) {
code = -1;
goto END;
- };
+ }
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
-
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
+ // compatible mode for refVer
bool hasTopic = false;
- int64_t refVer = ver;
+ int64_t refVer = INT64_MAX;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pWal->pRefHash, pIter);
@@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer == -1) continue;
refVer = TMIN(refVer, pRef->refVer - 1);
- wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
hasTopic = true;
}
- // compatible mode
if (pWal->cfg.retentionPeriod == 0 && hasTopic) {
+ wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver);
ver = TMIN(ver, refVer);
}
+ // find files safe to delete
int deleteCnt = 0;
int64_t newTotSize = pWal->totSize;
- SWalFileInfo tmp;
+ SWalFileInfo tmp = {0};
tmp.firstVer = ver;
- // find files safe to delete
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
+
if (pInfo) {
- SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet);
- wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
- pInfo->lastVer);
- if (ver >= pInfo->lastVer) {
+ wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver,
+ pInfo->firstVer, pInfo->lastVer);
+ ASSERT(ver <= pInfo->lastVer);
+ if (ver == pInfo->lastVer) {
pInfo++;
- wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
- pInfo->lastVer);
- }
- if (pInfo <= pLastFileInfo) {
- wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
- pInfo->lastVer);
- } else {
- wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
}
// iterate files, until the searched result
+ // delete according to file size or close time
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
- wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64
- "), new tot size %" PRId64,
- pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize);
- if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) ||
- ((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 &&
- iter->closeTs + pWal->cfg.retentionPeriod < ts))) {
- // delete according to file size or close time
- wDebug("vgId:%d, check pass", pWal->cfg.vgId);
+ if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
+ (pWal->cfg.retentionPeriod == 0 ||
+ pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
deleteCnt++;
newTotSize -= iter->fileSize;
taosArrayPush(pWal->toDeleteFiles, iter);
}
- wDebug("vgId:%d, check not pass", pWal->cfg.vgId);
}
- UPDATE_META:
// make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
@@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
}
}
+
+ // update meta
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
pWal->totSize = newTotSize;
pWal->vers.verInSnapshotting = -1;
- // save snapshot ver, commit ver
code = walSaveMeta(pWal);
if (code < 0) {
goto END;
@@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) {
// delete files
deleteCnt = taosArrayGetSize(pWal->toDeleteFiles);
- wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt);
- char fnameStr[WAL_FILE_LEN];
+ char fnameStr[WAL_FILE_LEN] = {0};
+ pInfo = NULL;
+
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->toDeleteFiles, i);
+
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
- wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
goto END;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
- wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
goto END;
}
}
+ if (pInfo != NULL) {
+ wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId,
+ deleteCnt, pInfo->lastVer, pInfo->closeTs);
+ }
taosArrayClear(pWal->toDeleteFiles);
END:
diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c
index f8f78ae6a5..dc57ed97b2 100644
--- a/source/util/src/tcompare.c
+++ b/source/util/src/tcompare.c
@@ -308,17 +308,19 @@ int32_t compareInt8Uint16(const void *pLeft, const void *pRight) {
int32_t compareInt8Uint32(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft);
+ if (left < 0) return -1;
uint32_t right = GET_UINT32_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint32_t)left > right) return 1;
+ if ((uint32_t)left < right) return -1;
return 0;
}
int32_t compareInt8Uint64(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft);
+ if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint64_t)left > right) return 1;
+ if ((uint64_t)left < right) return -1;
return 0;
}
@@ -380,17 +382,19 @@ int32_t compareInt16Uint16(const void *pLeft, const void *pRight) {
int32_t compareInt16Uint32(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft);
+ if (left < 0) return -1;
uint32_t right = GET_UINT32_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint32_t)left > right) return 1;
+ if ((uint32_t)left < right) return -1;
return 0;
}
int32_t compareInt16Uint64(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft);
+ if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint64_t)left > right) return 1;
+ if ((uint64_t)left < right) return -1;
return 0;
}
@@ -452,17 +456,19 @@ int32_t compareInt32Uint16(const void *pLeft, const void *pRight) {
int32_t compareInt32Uint32(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft);
+ if (left < 0) return -1;
uint32_t right = GET_UINT32_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint32_t)left > right) return 1;
+ if ((uint32_t)left < right) return -1;
return 0;
}
int32_t compareInt32Uint64(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft);
+ if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint64_t)left > right) return 1;
+ if ((uint64_t)left < right) return -1;
return 0;
}
@@ -532,9 +538,10 @@ int32_t compareInt64Uint32(const void *pLeft, const void *pRight) {
int32_t compareInt64Uint64(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft);
+ if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if ((uint64_t)left > right) return 1;
+ if ((uint64_t)left < right) return -1;
return 0;
}
@@ -857,24 +864,27 @@ int32_t compareUint16Uint64(const void *pLeft, const void *pRight) {
int32_t compareUint32Int8(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft);
int8_t right = GET_INT8_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint32_t)right) return 1;
+ if (left < (uint32_t)right) return -1;
return 0;
}
int32_t compareUint32Int16(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft);
int16_t right = GET_INT16_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint32_t)right) return 1;
+ if (left < (uint32_t)right) return -1;
return 0;
}
int32_t compareUint32Int32(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft);
int32_t right = GET_INT32_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint32_t)right) return 1;
+ if (left < (uint32_t)right) return -1;
return 0;
}
@@ -929,32 +939,36 @@ int32_t compareUint32Uint64(const void *pLeft, const void *pRight) {
int32_t compareUint64Int8(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int8_t right = GET_INT8_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint64_t)right) return 1;
+ if (left < (uint64_t)right) return -1;
return 0;
}
int32_t compareUint64Int16(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int16_t right = GET_INT16_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint64_t)right) return 1;
+ if (left < (uint64_t)right) return -1;
return 0;
}
int32_t compareUint64Int32(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int32_t right = GET_INT32_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint64_t)right) return 1;
+ if (left < (uint64_t)right) return -1;
return 0;
}
int32_t compareUint64Int64(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int64_t right = GET_INT64_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
+ if (right < 0) return 1;
+ if (left > (uint64_t)right) return 1;
+ if (left < (uint64_t)right) return -1;
return 0;
}
diff --git a/test1 b/test1
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/pytest/auto_crash_gen.py b/tests/pytest/auto_crash_gen.py
index 5af2f055cd..00e1786399 100755
--- a/tests/pytest/auto_crash_gen.py
+++ b/tests/pytest/auto_crash_gen.py
@@ -342,12 +342,29 @@ def main():
print('======== crash_gen run sucess and exit as expected ========')
try:
- text = f'''exit status: {msg_dict[status]}
- git commit : {git_commit}
- hostname: {hostname}
- start time: {starttime}
- end time: {endtime}
- cmd: {crash_cmds}'''
+ cmd = crash_cmds.split('&')[2]
+ if status == 0:
+ log_dir = "none"
+ else:
+ log_dir= "/root/pxiao/crash_gen_logs"
+
+ if status == 3:
+ core_dir = "/root/pxiao/crash_gen_logs"
+ else:
+ core_dir = "none"
+
+ text = f'''
+ exit status: {msg_dict[status]}
+ test scope: crash_gen
+ owner: pxiao
+ hostname: {hostname}
+ start time: {starttime}
+ end time: {endtime}
+ git commit : {git_commit}
+ log dir: {log_dir}
+ core dir: {core_dir}
+ cmd: {cmd}'''
+
send_msg(get_msg(text))
except Exception as e:
print("exception:", e)
diff --git a/tests/pytest/auto_crash_gen_valgrind.py b/tests/pytest/auto_crash_gen_valgrind.py
index 49e2c43f84..e37cda0a27 100755
--- a/tests/pytest/auto_crash_gen_valgrind.py
+++ b/tests/pytest/auto_crash_gen_valgrind.py
@@ -377,13 +377,30 @@ def main():
print('======== crash_gen run sucess and exit as expected ========')
try:
- text = f'''exit status: {msg_dict[status]}
- git commit : {git_commit}
- hostname: {hostname}
- start time: {starttime}
- end time: {endtime}
- cmd: {crash_cmds}'''
- send_msg(get_msg(text))
+ cmd = crash_cmds.split('&')[2]
+ if status == 0:
+ log_dir = "none"
+ else:
+ log_dir= "/root/pxiao/crash_gen_logs"
+
+ if status == 3:
+ core_dir = "/root/pxiao/crash_gen_logs"
+ else:
+ core_dir = "none"
+
+ text = f'''
+ exit status: {msg_dict[status]}
+ test scope: crash_gen
+ owner: pxiao
+ hostname: {hostname}
+ start time: {starttime}
+ end time: {endtime}
+ git commit : {git_commit}
+ log dir: {log_dir}
+ core dir: {core_dir}
+ cmd: {cmd}'''
+
+ send_msg(get_msg(text))
except Exception as e:
print("exception:", e)
exit(status)
diff --git a/tests/pytest/auto_crash_gen_valgrind_cluster.py b/tests/pytest/auto_crash_gen_valgrind_cluster.py
index 5189ff4262..af19836a83 100755
--- a/tests/pytest/auto_crash_gen_valgrind_cluster.py
+++ b/tests/pytest/auto_crash_gen_valgrind_cluster.py
@@ -377,12 +377,29 @@ def main():
print('======== crash_gen run sucess and exit as expected ========')
try:
- text = f'''exit status: {msg_dict[status]}
- git commit : {git_commit}
- hostname: {hostname}
- start time: {starttime}
- end time: {endtime}
- cmd: {crash_cmds}'''
+ cmd = crash_cmds.split('&')[2]
+ if status == 0:
+ log_dir = "none"
+ else:
+ log_dir= "/root/pxiao/crash_gen_logs"
+
+ if status == 3:
+ core_dir = "/root/pxiao/crash_gen_logs"
+ else:
+ core_dir = "none"
+
+ text = f'''
+ exit status: {msg_dict[status]}
+ test scope: crash_gen
+ owner: pxiao
+ hostname: {hostname}
+ start time: {starttime}
+ end time: {endtime}
+ git commit : {git_commit}
+ log dir: {log_dir}
+ core dir: {core_dir}
+ cmd: {cmd}'''
+
send_msg(get_msg(text))
except Exception as e:
print("exception:", e)
diff --git a/tests/pytest/util/cluster.py b/tests/pytest/util/cluster.py
index 2607cf63c2..a6e3530dc9 100644
--- a/tests/pytest/util/cluster.py
+++ b/tests/pytest/util/cluster.py
@@ -52,8 +52,9 @@ class ConfigureyCluster:
dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}")
# configure dnoe of independent mnodes
- if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == True :
- dnode.addExtraCfg("supportVnodes", 1024)
+ if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == "True" :
+ tdLog.info("set mnode supportVnodes 0")
+ dnode.addExtraCfg("supportVnodes", 0)
# print(dnode)
self.dnodes.append(dnode)
return self.dnodes
@@ -71,6 +72,7 @@ class ConfigureyCluster:
tdSql.init(conn.cursor())
mnodeNums=int(mnodeNums)
for i in range(2,mnodeNums+1):
+ tdLog.info("create mnode on dnode %d"%i)
tdSql.execute(" create mnode on dnode %d;"%i)
diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim
index dccfc7f5d6..ded5d6f78a 100644
--- a/tests/script/tsim/alter/table.sim
+++ b/tests/script/tsim/alter/table.sim
@@ -657,6 +657,17 @@ if $data20 != null then
return -1
endi
+print =============== error
+sql create table tb2023(ts timestamp, f int);
+sql_error alter table tb2023 add column v varchar(16375);
+sql_error alter table tb2023 add column v varchar(16385);
+sql_error alter table tb2023 add column v varchar(33100);
+sql alter table tb2023 add column v varchar(16374);
+sql desc tb2023
+sql alter table tb2023 drop column v
+sql_error alter table tb2023 add column v nchar(4094);
+sql alter table tb2023 add column v nchar(4093);
+sql desc tb2023
print ======= over
sql drop database d1
sql select * from information_schema.ins_databases
diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim
index 32dbe826cc..64b17125aa 100644
--- a/tests/script/tsim/db/error1.sim
+++ b/tests/script/tsim/db/error1.sim
@@ -58,16 +58,16 @@ if $data23 != 0 then
return -1
endi
-print ========== stop dnode2
-system sh/exec.sh -n dnode2 -s stop -x SIGKILL
+#print ========== stop dnode2
+#system sh/exec.sh -n dnode2 -s stop -x SIGKILL
-sleep 1000
-print =============== drop database
-sql_error drop database d1
+#sleep 1000
+#print =============== drop database
+sql drop database d1
-print ========== start dnode2
-system sh/exec.sh -n dnode2 -s start
-sleep 1000
+#print ========== start dnode2
+#system sh/exec.sh -n dnode2 -s start
+#sleep 1000
print =============== re-create database
$x = 0
diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py
index 149c6d8ded..f5926321da 100644
--- a/tests/system-test/6-cluster/clusterCommonCheck.py
+++ b/tests/system-test/6-cluster/clusterCommonCheck.py
@@ -207,7 +207,7 @@ class ClusterComCheck:
count+=1
else:
tdLog.debug(tdSql.queryResult)
- tdLog.exit("stop mnodes on dnode %d failed in 10s ")
+ tdLog.exit(f"stop mnodes on dnode {offlineDnodeNo} failed in 10s ")
def check3mnode2off(self,mnodeNums=3):
count=0
@@ -226,7 +226,45 @@ class ClusterComCheck:
count+=1
else:
tdLog.debug(tdSql.queryResult)
- tdLog.exit("stop mnodes on dnode %d failed in 10s ")
+ tdLog.exit("stop mnodes on dnode 2 or 3 failed in 10s")
+
+ def check_vgroups_status(self,vgroup_numbers=2,db_replica=3,count_number=10,db_name="db"):
+ """ check vgroups status in 10s after db vgroups status is changed """
+ vgroup_numbers = int(vgroup_numbers)
+ self.db_replica = int(db_replica)
+ tdLog.debug("start to check status of vgroups")
+ count=0
+ last_number=vgroup_numbers-1
+ while count < count_number:
+ time.sleep(1)
+ tdSql.query(f"show {db_name}.vgroups;")
+ if count == 0 :
+ if tdSql.checkRows(vgroup_numbers) :
+ tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" )
+ else:
+ tdLog.exit(f"vgroup number of {db_name} is not correct")
+ if self.db_replica == 1 :
+ if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[1][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader':
+ ready_time= (count + 1)
+ tdLog.success(f"all vgroups of {db_name} are leaders in {count + 1} s")
+ return True
+ count+=1
+ elif self.db_replica == 3 :
+ vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]]
+
+ vgroup_status_last=[tdSql.queryResult[last_number][4],tdSql.queryResult[last_number][6],tdSql.queryResult[last_number][8]]
+ if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2:
+ if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2:
+ ready_time= (count + 1)
+ tdLog.success(f"all vgroups of {db_name} are ready in {ready_time} s")
+ return True
+ count+=1
+ else:
+ tdLog.debug(tdSql.queryResult)
+ tdLog.notice(f"all vgroups leader of {db_name} is selected {count}s ")
+ caller = inspect.getframeinfo(inspect.stack()[1][0])
+ args = (caller.filename, caller.lineno)
+ tdLog.exit("%s(%d) failed " % args)
diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py
new file mode 100644
index 0000000000..7d46b3143d
--- /dev/null
+++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep1to3.py
@@ -0,0 +1,206 @@
+import taos
+import sys
+import time
+import os
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import TDDnodes
+from util.dnodes import TDDnode
+from util.cluster import *
+sys.path.append("./6-cluster")
+from clusterCommonCreate import *
+from clusterCommonCheck import clusterComCheck
+
+import time
+import socket
+import subprocess
+from multiprocessing import Process
+import threading
+import time
+import inspect
+import ctypes
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ tdLog.debug(f"start to excute {__file__}")
+ self.TDDnodes = None
+ tdSql.init(conn.cursor())
+ self.host = socket.gethostname()
+
+
+ def getBuildPath(self):
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ for root, dirs, files in os.walk(projPath):
+ if ("taosd" in files):
+ rootRealPath = os.path.dirname(os.path.realpath(root))
+ if ("packaging" not in rootRealPath):
+ buildPath = root[:len(root) - len("/build/bin")]
+ break
+ return buildPath
+
+ def _async_raise(self, tid, exctype):
+ """raises the exception, performs cleanup if needed"""
+ if not inspect.isclass(exctype):
+ exctype = type(exctype)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+ if res == 0:
+ raise ValueError("invalid thread id")
+ elif res != 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
+
+ def stopThread(self,thread):
+ self._async_raise(thread.ident, SystemExit)
+
+
+ def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'db0_0',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'replica': 1,
+ 'stbName': 'stb',
+ 'stbNumbers': 2,
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbNum': 200,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ "rowsPerTbl": 1000,
+ "batchNum": 5000
+ }
+
+ dnodeNumbers=int(dnodeNumbers)
+ mnodeNums=int(mnodeNums)
+ vnodeNumbers = int(dnodeNumbers-mnodeNums)
+ allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
+ rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
+ rowsall=rowsPerStb*paraDict['stbNumbers']
+ dbNumbers = 1
+
+ tdLog.info("first check dnode and mnode")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ tdSql.checkData(0,1,'%s:6030'%self.host)
+ tdSql.checkData(4,1,'%s:6430'%self.host)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ #check mnode status
+ tdLog.info("check mnode status")
+ clusterComCheck.checkMnodeStatus(mnodeNums)
+
+ # add some error operations and
+ tdLog.info("Confirm the status of the dnode again")
+ tdSql.error("create mnode on dnode 2")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ print(tdSql.queryResult)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ # create database and stable
+ clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
+ tdLog.info("Take turns stopping Mnodes ")
+
+ tdDnodes=cluster.dnodes
+ stopcount =0
+ threads=[]
+
+ # create stable:stb_0
+ stableName= paraDict['stbName']
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
+ #create child table:ctb_0
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
+ #insert date
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
+ for tr in threads:
+ tr.start()
+ for tr in threads:
+ tr.join()
+
+ while stopcount < restartNumbers:
+ tdLog.info(" restart loop: %d"%stopcount )
+ if stopRole == "mnode":
+ for i in range(mnodeNums):
+ tdDnodes[i].stoptaosd()
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ # sleep(10)
+ elif stopRole == "vnode":
+ for i in range(vnodeNumbers):
+ tdDnodes[i+mnodeNums].stoptaosd()
+ # sleep(10)
+ tdDnodes[i+mnodeNums].starttaosd()
+ # sleep(10)
+ elif stopRole == "dnode":
+ for i in range(dnodeNumbers):
+ if i == 0 :
+ stableName= '%s_%d'%(paraDict['stbName'],0)
+ newTdSql=tdCom.newTdSql()
+ # newTdSql.execute('alter database db0_0 replica 3')
+ clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
+ tdDnodes[i].stoptaosd()
+ clusterComCheck.checkDbRows(dbNumbers)
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ if i == 3 :
+ TdSqlEx=tdCom.newTdSql()
+ tdLog.info("alter database db0_0 replica 3")
+ TdSqlEx.execute('alter database db0_0 replica 3')
+
+
+ # dnodeNumbers don't include database of schema
+ if clusterComCheck.checkDnodes(dnodeNumbers):
+ tdLog.info("123")
+ else:
+ print("456")
+
+ self.stopThread(threads)
+ tdLog.exit("one or more of dnodes failed to start ")
+ # self.check3mnode()
+ stopcount+=1
+
+
+ clusterComCheck.checkDnodes(dnodeNumbers)
+ clusterComCheck.checkDbRows(dbNumbers)
+ # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
+
+ # tdSql.execute("use %s" %(paraDict["dbName"]))
+ tdSql.query("show %s.stables"%(paraDict["dbName"]))
+ tdSql.checkRows(paraDict["stbNumbers"])
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
+ tdSql.query("select count(*) from %s"%stableName)
+ if i == 0 :
+ tdSql.checkData(0,0,rowsPerStb*2)
+ else:
+ tdSql.checkData(0,0,rowsPerStb)
+ clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=150)
+ def run(self):
+ # print(self.master_dnode.cfgDict)
+ self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode')
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py
new file mode 100644
index 0000000000..5b5fb04969
--- /dev/null
+++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDataRebootModifyMetaAlterRep3to1.py
@@ -0,0 +1,206 @@
+import taos
+import sys
+import time
+import os
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import TDDnodes
+from util.dnodes import TDDnode
+from util.cluster import *
+sys.path.append("./6-cluster")
+from clusterCommonCreate import *
+from clusterCommonCheck import clusterComCheck
+
+import time
+import socket
+import subprocess
+from multiprocessing import Process
+import threading
+import time
+import inspect
+import ctypes
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ tdLog.debug(f"start to excute {__file__}")
+ self.TDDnodes = None
+ tdSql.init(conn.cursor())
+ self.host = socket.gethostname()
+
+
+ def getBuildPath(self):
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ for root, dirs, files in os.walk(projPath):
+ if ("taosd" in files):
+ rootRealPath = os.path.dirname(os.path.realpath(root))
+ if ("packaging" not in rootRealPath):
+ buildPath = root[:len(root) - len("/build/bin")]
+ break
+ return buildPath
+
+ def _async_raise(self, tid, exctype):
+ """raises the exception, performs cleanup if needed"""
+ if not inspect.isclass(exctype):
+ exctype = type(exctype)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+ if res == 0:
+ raise ValueError("invalid thread id")
+ elif res != 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
+
+ def stopThread(self,thread):
+ self._async_raise(thread.ident, SystemExit)
+
+
+ def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'db0_0',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'replica': 3,
+ 'stbName': 'stb',
+ 'stbNumbers': 2,
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbNum': 200,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ "rowsPerTbl": 1000,
+ "batchNum": 5000
+ }
+
+ dnodeNumbers=int(dnodeNumbers)
+ mnodeNums=int(mnodeNums)
+ vnodeNumbers = int(dnodeNumbers-mnodeNums)
+ allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
+ rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
+ rowsall=rowsPerStb*paraDict['stbNumbers']
+ dbNumbers = 1
+
+ tdLog.info("first check dnode and mnode")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ tdSql.checkData(0,1,'%s:6030'%self.host)
+ tdSql.checkData(4,1,'%s:6430'%self.host)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ #check mnode status
+ tdLog.info("check mnode status")
+ clusterComCheck.checkMnodeStatus(mnodeNums)
+
+ # add some error operations and
+ tdLog.info("Confirm the status of the dnode again")
+ tdSql.error("create mnode on dnode 2")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ print(tdSql.queryResult)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ # create database and stable
+ clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
+ tdLog.info("Take turns stopping Mnodes ")
+
+ tdDnodes=cluster.dnodes
+ stopcount =0
+ threads=[]
+
+ # create stable:stb_0
+ stableName= paraDict['stbName']
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
+ #create child table:ctb_0
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
+ #insert date
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
+ for tr in threads:
+ tr.start()
+ for tr in threads:
+ tr.join()
+
+ while stopcount < restartNumbers:
+ tdLog.info(" restart loop: %d"%stopcount )
+ if stopRole == "mnode":
+ for i in range(mnodeNums):
+ tdDnodes[i].stoptaosd()
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ # sleep(10)
+ elif stopRole == "vnode":
+ for i in range(vnodeNumbers):
+ tdDnodes[i+mnodeNums].stoptaosd()
+ # sleep(10)
+ tdDnodes[i+mnodeNums].starttaosd()
+ # sleep(10)
+ elif stopRole == "dnode":
+ for i in range(dnodeNumbers):
+ tdDnodes[i].stoptaosd()
+ clusterComCheck.checkDbRows(dbNumbers)
+ if i == 0 :
+ stableName= '%s_%d'%(paraDict['stbName'],0)
+ newTdSql=tdCom.newTdSql()
+ # newTdSql.execute('alter database db0_0 replica 3')
+ clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ if i == 3 :
+ TdSqlEx=tdCom.newTdSql()
+ tdLog.info("alter database db0_0 replica 1")
+ TdSqlEx.execute('alter database db0_0 replica 1')
+
+
+ # dnodeNumbers don't include database of schema
+ if clusterComCheck.checkDnodes(dnodeNumbers):
+ tdLog.info("123")
+ else:
+ print("456")
+
+ self.stopThread(threads)
+ tdLog.exit("one or more of dnodes failed to start ")
+ # self.check3mnode()
+ stopcount+=1
+
+
+ clusterComCheck.checkDnodes(dnodeNumbers)
+ clusterComCheck.checkDbRows(dbNumbers)
+ # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
+
+ # tdSql.execute("use %s" %(paraDict["dbName"]))
+ tdSql.query("show %s.stables"%(paraDict["dbName"]))
+ tdSql.checkRows(paraDict["stbNumbers"])
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
+ tdSql.query("select count(*) from %s"%stableName)
+ if i == 0 :
+ tdSql.checkData(0,0,rowsPerStb*2)
+ else:
+ tdSql.checkData(0,0,rowsPerStb)
+ clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=1,db_name=paraDict["dbName"],count_number=150)
+ def run(self):
+ # print(self.master_dnode.cfgDict)
+ self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode')
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py
new file mode 100644
index 0000000000..aa3ed8e3fd
--- /dev/null
+++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertDatarRebootAlterRep1-3.py
@@ -0,0 +1,222 @@
+import taos
+import sys
+import time
+import os
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import TDDnodes
+from util.dnodes import TDDnode
+from util.cluster import *
+sys.path.append("./6-cluster")
+from clusterCommonCreate import *
+from clusterCommonCheck import clusterComCheck
+
+import time
+import socket
+import subprocess
+from multiprocessing import Process
+import threading
+import time
+import inspect
+import ctypes
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ tdLog.debug(f"start to excute {__file__}")
+ self.TDDnodes = None
+ tdSql.init(conn.cursor())
+ self.host = socket.gethostname()
+
+
+ def getBuildPath(self):
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ for root, dirs, files in os.walk(projPath):
+ if ("taosd" in files):
+ rootRealPath = os.path.dirname(os.path.realpath(root))
+ if ("packaging" not in rootRealPath):
+ buildPath = root[:len(root) - len("/build/bin")]
+ break
+ return buildPath
+
+ def _async_raise(self, tid, exctype):
+ """raises the exception, performs cleanup if needed"""
+ if not inspect.isclass(exctype):
+ exctype = type(exctype)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+ if res == 0:
+ raise ValueError("invalid thread id")
+ elif res != 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
+
+ def stopThread(self,thread):
+ self._async_raise(thread.ident, SystemExit)
+
+
+ def insertData(self,countstart,countstop):
+ # fisrt add data : db\stable\childtable\general table
+
+ for couti in range(countstart,countstop):
+ tdLog.debug("drop database if exists db%d" %couti)
+ tdSql.execute("drop database if exists db%d" %couti)
+ print("create database if not exists db%d replica 1 duration 300" %couti)
+ tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
+ tdSql.execute("use db%d" %couti)
+ tdSql.execute(
+ '''create table stb1
+ (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
+ tags (t1 int)
+ '''
+ )
+ tdSql.execute(
+ '''
+ create table t1
+ (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
+ '''
+ )
+ for i in range(4):
+ tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
+
+
+ def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'db0_0',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'replica': 1,
+ 'stbName': 'stb',
+ 'stbNumbers': 2,
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbNum': 1000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ "rowsPerTbl": 100,
+ "batchNum": 5000
+ }
+
+ dnodeNumbers = int(dnodeNumbers)
+ mnodeNums = int(mnodeNums)
+ vnodeNumbers = int(dnodeNumbers-mnodeNums)
+ allctbNumbers = (paraDict['stbNumbers']*paraDict["ctbNum"])
+ rowsPerStb = paraDict["ctbNum"]*paraDict["rowsPerTbl"]
+ rowsall = rowsPerStb*paraDict['stbNumbers']
+ dbNumbers = 1
+ replica3 = 3
+ tdLog.info("first check dnode and mnode")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ tdSql.checkData(0,1,'%s:6030'%self.host)
+ tdSql.checkData(4,1,'%s:6430'%self.host)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ #check mnode status
+ tdLog.info("check mnode status")
+ clusterComCheck.checkMnodeStatus(mnodeNums)
+
+ # add some error operations and
+ tdLog.info("Confirm the status of the dnode again")
+ tdSql.error("create mnode on dnode 2")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ print(tdSql.queryResult)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ # create database and stable
+ clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
+ tdLog.info("Take turns stopping Mnodes ")
+
+ tdDnodes=cluster.dnodes
+ stopcount =0
+ threads=[]
+
+ # create stable:stb_0
+ stableName= paraDict['stbName']
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
+ #create child table:ctb_0
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
+ #insert date
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
+ for tr in threads:
+ tr.start()
+ TdSqlEx=tdCom.newTdSql()
+ tdLog.info("alter database db0_0 replica 3")
+ TdSqlEx.execute('alter database db0_0 replica 3')
+ while stopcount < restartNumbers:
+ tdLog.info(" restart loop: %d"%stopcount )
+ if stopRole == "mnode":
+ for i in range(mnodeNums):
+ tdDnodes[i].stoptaosd()
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ # sleep(10)
+ elif stopRole == "vnode":
+ for i in range(vnodeNumbers):
+ tdDnodes[i+mnodeNums].stoptaosd()
+ # sleep(10)
+ tdDnodes[i+mnodeNums].starttaosd()
+ # sleep(10)
+ elif stopRole == "dnode":
+ for i in range(dnodeNumbers):
+ tdDnodes[i].stoptaosd()
+ # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;')
+ # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;')
+ # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);')
+ # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);')
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ # sleep(10)
+ # dnodeNumbers don't include database of schema
+ if clusterComCheck.checkDnodes(dnodeNumbers):
+ tdLog.info("123")
+ else:
+ print("456")
+
+ self.stopThread(threads)
+ tdLog.exit("one or more of dnodes failed to start ")
+ # self.check3mnode()
+ stopcount+=1
+
+ for tr in threads:
+ tr.join()
+ clusterComCheck.checkDnodes(dnodeNumbers)
+ clusterComCheck.checkDbRows(dbNumbers)
+ # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
+
+ # tdSql.execute("use %s" %(paraDict["dbName"]))
+ tdSql.query("show %s.stables"%(paraDict["dbName"]))
+ tdSql.checkRows(paraDict["stbNumbers"])
+ # for i in range(paraDict['stbNumbers']):
+ # stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
+ # tdSql.query("select count(*) from %s"%stableName)
+ # tdSql.checkData(0,0,rowsPerStb)
+ clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=240)
+ def run(self):
+ # print(self.master_dnode.cfgDict)
+ self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode')
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
new file mode 100644
index 0000000000..ed7b99a880
--- /dev/null
+++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py
@@ -0,0 +1,196 @@
+import taos
+import sys
+import time
+import os
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import TDDnodes
+from util.dnodes import TDDnode
+from util.cluster import *
+sys.path.append("./6-cluster")
+from clusterCommonCreate import *
+from clusterCommonCheck import clusterComCheck
+
+import time
+import socket
+import subprocess
+from multiprocessing import Process
+import threading
+import time
+import inspect
+import ctypes
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ tdLog.debug(f"start to excute {__file__}")
+ self.TDDnodes = None
+ tdSql.init(conn.cursor())
+ self.host = socket.gethostname()
+
+
+ def getBuildPath(self):
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ for root, dirs, files in os.walk(projPath):
+ if ("taosd" in files):
+ rootRealPath = os.path.dirname(os.path.realpath(root))
+ if ("packaging" not in rootRealPath):
+ buildPath = root[:len(root) - len("/build/bin")]
+ break
+ return buildPath
+
+ def _async_raise(self, tid, exctype):
+ """raises the exception, performs cleanup if needed"""
+ if not inspect.isclass(exctype):
+ exctype = type(exctype)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+ if res == 0:
+ raise ValueError("invalid thread id")
+ elif res != 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
+
+ def stopThread(self,thread):
+ self._async_raise(thread.ident, SystemExit)
+
+
+ def insertData(self,countstart,countstop):
+ # fisrt add data : db\stable\childtable\general table
+
+ for couti in range(countstart,countstop):
+ tdLog.debug("drop database if exists db%d" %couti)
+ tdSql.execute("drop database if exists db%d" %couti)
+ print("create database if not exists db%d replica 1 duration 300" %couti)
+ tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
+ tdSql.execute("use db%d" %couti)
+ tdSql.execute(
+ '''create table stb1
+ (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
+ tags (t1 int)
+ '''
+ )
+ tdSql.execute(
+ '''
+ create table t1
+ (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
+ '''
+ )
+ for i in range(4):
+ tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
+
+
+ def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'db0_0',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'replica': 3,
+ 'stbName': 'stb',
+ 'stbNumbers': 2,
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbNum': 1,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ "rowsPerTbl": 1,
+ "batchNum": 5000
+ }
+
+ dnodeNumbers=int(dnodeNumbers)
+ mnodeNums=int(mnodeNums)
+ vnodeNumbers = int(dnodeNumbers-mnodeNums)
+ replica1 = 1
+ replica3 = 3
+ allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
+ rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
+ rowsall=rowsPerStb*paraDict['stbNumbers']
+ dbNumbers = 1
+
+ tdLog.info("first check dnode and mnode")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ tdSql.checkData(0,1,'%s:6030'%self.host)
+ tdSql.checkData(4,1,'%s:6430'%self.host)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ #check mnode status
+ tdLog.info("check mnode status")
+ clusterComCheck.checkMnodeStatus(mnodeNums)
+
+ # add some error operations and
+ tdLog.info("Confirm the status of the dnode again")
+ tdSql.error("create mnode on dnode 2")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ print(tdSql.queryResult)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ # create database and stable
+ clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
+ tdLog.info("Take turns stopping Mnodes ")
+
+ tdDnodes=cluster.dnodes
+ stopcount =0
+ threads=[]
+
+ # create stable:stb_0
+ stableName= paraDict['stbName']
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
+ #create child table:ctb_0
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
+ #insert date
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s_%d'%(paraDict['stbName'],i)
+ newTdSql=tdCom.newTdSql()
+ threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
+ for tr in threads:
+ tr.start()
+ TdSqlEx=tdCom.newTdSql()
+ tdLog.info(f"alter database db0_0 replica {replica1}")
+ TdSqlEx.execute(f'alter database db0_0 replica {replica1}')
+ for tr in threads:
+ tr.join()
+ clusterComCheck.checkDnodes(dnodeNumbers)
+ clusterComCheck.checkDbRows(dbNumbers)
+ # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
+
+ # tdSql.execute("use %s" %(paraDict["dbName"]))
+ tdSql.query("show %s.stables"%(paraDict["dbName"]))
+ tdSql.checkRows(paraDict["stbNumbers"])
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
+ tdSql.query("select count(*) from %s"%stableName)
+ tdSql.checkData(0,0,rowsPerStb)
+
+ clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica1,db_name=paraDict["dbName"],count_number=20)
+ sleep(5)
+ tdLog.info(f"show transactions;alter database db0_0 replica {replica3};")
+ TdSqlEx.execute(f'show transactions;')
+ TdSqlEx.execute(f'alter database db0_0 replica {replica3};')
+ clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=120)
+
+ def run(self):
+ # print(self.master_dnode.cfgDict)
+ self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode')
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py
new file mode 100644
index 0000000000..e02af29a05
--- /dev/null
+++ b/tests/system-test/6-cluster/manually-test/6dnode3mnodeStopDnodeInsertDatatb.py
@@ -0,0 +1,191 @@
+import taos
+import sys
+import time
+import os
+
+from util.log import *
+from util.sql import *
+from util.cases import *
+from util.dnodes import TDDnodes
+from util.dnodes import TDDnode
+from util.cluster import *
+sys.path.append("./6-cluster")
+from clusterCommonCreate import *
+from clusterCommonCheck import clusterComCheck
+
+import time
+import socket
+import subprocess
+from multiprocessing import Process
+import threading
+import time
+import inspect
+import ctypes
+
+class TDTestCase:
+
+ def init(self, conn, logSql, replicaVar=1):
+ tdLog.debug(f"start to excute {__file__}")
+ self.TDDnodes = None
+ tdSql.init(conn.cursor())
+ self.host = socket.gethostname()
+
+
+ def getBuildPath(self):
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+
+ for root, dirs, files in os.walk(projPath):
+ if ("taosd" in files):
+ rootRealPath = os.path.dirname(os.path.realpath(root))
+ if ("packaging" not in rootRealPath):
+ buildPath = root[:len(root) - len("/build/bin")]
+ break
+ return buildPath
+
+ def _async_raise(self, tid, exctype):
+ """raises the exception, performs cleanup if needed"""
+ if not inspect.isclass(exctype):
+ exctype = type(exctype)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+ if res == 0:
+ raise ValueError("invalid thread id")
+ elif res != 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
+
+ def stopThread(self,thread):
+ self._async_raise(thread.ident, SystemExit)
+
+
+ def insertData(self,dbname,tableCount,rowsPerCount):
+ # tableCount : create table number
+ # rowsPerCount : rows per table
+ # fisrt add data : db\stable\childtable\general table
+ os.system(f"taosBenchmark -d {dbname} -n {tableCount} -t {rowsPerCount} -z 1 -k 10000 -y ")
+
+
+ def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
+ tdLog.printNoPrefix("======== test case 1: ")
+ paraDict = {'dbName': 'db0_0',
+ 'dropFlag': 1,
+ 'event': '',
+ 'vgroups': 4,
+ 'replica': 1,
+ 'stbName': 'stb',
+ 'stbNumbers': 2,
+ 'colPrefix': 'c',
+ 'tagPrefix': 't',
+ 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
+ 'ctbPrefix': 'ctb',
+ 'ctbNum': 10000,
+ 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
+ "rowsPerTbl": 10000,
+ "batchNum": 5000
+ }
+
+ dnodeNumbers=int(dnodeNumbers)
+ mnodeNums=int(mnodeNums)
+ vnodeNumbers = int(dnodeNumbers-mnodeNums)
+ allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
+ rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
+ rowsall=rowsPerStb*paraDict['stbNumbers']
+ dbNumbers = 1
+
+ tdLog.info("first check dnode and mnode")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ tdSql.checkData(0,1,'%s:6030'%self.host)
+ tdSql.checkData(4,1,'%s:6430'%self.host)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ #check mnode status
+ tdLog.info("check mnode status")
+ clusterComCheck.checkMnodeStatus(mnodeNums)
+
+ # add some error operations and
+ tdLog.info("Confirm the status of the dnode again")
+ tdSql.error("create mnode on dnode 2")
+ tdSql.query("select * from information_schema.ins_dnodes;")
+ print(tdSql.queryResult)
+ clusterComCheck.checkDnodes(dnodeNumbers)
+
+ # create database and stable
+ tdLog.info("Take turns stopping Mnodes ")
+
+ tdDnodes=cluster.dnodes
+ stopcount =0
+ threads=[]
+
+ # create stable:stb_0
+ threads.append(threading.Thread(target=self.insertData, args=(paraDict["dbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"])))
+ for tr in threads:
+ tr.start()
+ TdSqlEx=tdCom.newTdSql()
+ tdLog.info("alter database db0_0 replica 3")
+ TdSqlEx.execute('alter database db0_0 replica 3')
+ while stopcount < restartNumbers:
+ tdLog.info(" restart loop: %d"%stopcount )
+ if stopRole == "mnode":
+ for i in range(mnodeNums):
+ tdDnodes[i].stoptaosd()
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ # sleep(10)
+ elif stopRole == "vnode":
+ for i in range(vnodeNumbers):
+ tdDnodes[i+mnodeNums].stoptaosd()
+ # sleep(10)
+ tdDnodes[i+mnodeNums].starttaosd()
+ # sleep(10)
+ elif stopRole == "dnode":
+ for i in range(dnodeNumbers):
+ tdDnodes[i].stoptaosd()
+ # tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;')
+ # TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;')
+ # tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);')
+ # TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);')
+ # sleep(10)
+ tdDnodes[i].starttaosd()
+ # sleep(10)
+ # dnodeNumbers don't include database of schema
+ if clusterComCheck.checkDnodes(dnodeNumbers):
+ tdLog.info("123")
+ else:
+ print("456")
+
+ self.stopThread(threads)
+ tdLog.exit("one or more of dnodes failed to start ")
+ # self.check3mnode()
+ stopcount+=1
+
+ for tr in threads:
+ tr.join()
+ clusterComCheck.checkDnodes(dnodeNumbers)
+ clusterComCheck.checkDbRows(dbNumbers)
+ # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
+
+ # tdSql.execute("use %s" %(paraDict["dbName"]))
+ tdSql.query("show %s.stables"%(paraDict["dbName"]))
+ tdSql.checkRows(paraDict["stbNumbers"])
+ for i in range(paraDict['stbNumbers']):
+ stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
+ tdSql.query("select count(*) from %s"%stableName)
+ tdSql.checkData(0,0,rowsPerStb)
+ clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=240)
+ def run(self):
+ # print(self.master_dnode.cfgDict)
+ self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode')
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py
index 5b5326cfba..b66334a6a6 100644
--- a/tests/system-test/7-tmq/subscribeDb3.py
+++ b/tests/system-test/7-tmq/subscribeDb3.py
@@ -336,7 +336,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
- if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0:
+ if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0:
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py
index 9dcbf5b351..53f1a34d58 100644
--- a/tests/system-test/7-tmq/subscribeStb.py
+++ b/tests/system-test/7-tmq/subscribeStb.py
@@ -226,12 +226,11 @@ class TDTestCase:
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
- pollDelay = 5
+ pollDelay = 10
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
- time.sleep(5)
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
@@ -307,7 +306,7 @@ class TDTestCase:
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
- pollDelay = 5
+ pollDelay = 10
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file
index 214e01f1a8..7e68c40fd8 100644
--- a/tests/system-test/win-test-file
+++ b/tests/system-test/win-test-file
@@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeDb2.py
python3 ./test.py -f 7-tmq/subscribeDb3.py
python3 ./test.py -f 7-tmq/subscribeDb4.py
-#python3 ./test.py -f 7-tmq/subscribeStb.py
+python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py
python3 ./test.py -f 7-tmq/subscribeStb1.py
python3 ./test.py -f 7-tmq/subscribeStb2.py