Merge remote-tracking branch 'origin/3.0' into feature/3.0_debug_wxy
This commit is contained in:
commit
b7b64fda84
|
@ -16,3 +16,6 @@
|
|||
[submodule "tools/taos-tools"]
|
||||
path = tools/taos-tools
|
||||
url = https://github.com/taosdata/taos-tools
|
||||
[submodule "tools/taosadapter"]
|
||||
path = tools/taosadapter
|
||||
url = https://github.com/taosdata/taosadapter.git
|
||||
|
|
14
Jenkinsfile2
14
Jenkinsfile2
|
@ -38,6 +38,7 @@ def pre_test(){
|
|||
sh '''
|
||||
hostname
|
||||
date
|
||||
env
|
||||
'''
|
||||
sh '''
|
||||
cd ${WK}
|
||||
|
@ -82,23 +83,33 @@ def pre_test(){
|
|||
sh '''
|
||||
cd ${WKC}
|
||||
git pull >/dev/null
|
||||
git log -5
|
||||
echo "`date "+%Y%m%d-%H%M%S"` ${JOB_NAME}:${BRANCH_NAME}:${BUILD_ID}:${CHANGE_TARGET}" >>${WKDIR}/jenkins.log
|
||||
echo "community log: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
git fetch origin +refs/pull/${CHANGE_ID}/merge
|
||||
git checkout -qf FETCH_HEAD
|
||||
git log -5
|
||||
echo "community log merged: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
cd ${WK}
|
||||
git pull >/dev/null
|
||||
git log -5
|
||||
echo "tdinternal log: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
'''
|
||||
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
|
||||
sh '''
|
||||
cd ${WK}
|
||||
git pull >/dev/null
|
||||
git log -5
|
||||
echo "`date "+%Y%m%d-%H%M%S"` ${JOB_NAME}:${BRANCH_NAME}:${BUILD_ID}:${CHANGE_TARGET}" >>${WKDIR}/jenkins.log
|
||||
echo "tdinternal log: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
git fetch origin +refs/pull/${CHANGE_ID}/merge
|
||||
git checkout -qf FETCH_HEAD
|
||||
git log -5
|
||||
echo "tdinternal log merged: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
cd ${WKC}
|
||||
git pull >/dev/null
|
||||
git log -5
|
||||
echo "community log: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
'''
|
||||
} else {
|
||||
sh '''
|
||||
|
@ -113,6 +124,9 @@ def pre_test(){
|
|||
cd ${WKPY}
|
||||
git reset --hard
|
||||
git pull
|
||||
git log -5
|
||||
echo "python connector log: `git log -5`" >>${WKDIR}/jenkins.log
|
||||
echo >>${WKDIR}/jenkins.log
|
||||
'''
|
||||
return 1
|
||||
}
|
||||
|
|
|
@ -18,6 +18,33 @@ if (NOT DEFINED TD_GRANT)
|
|||
SET(TD_GRANT FALSE)
|
||||
endif()
|
||||
|
||||
IF ("${BUILD_HTTP}" STREQUAL "")
|
||||
IF (TD_LINUX)
|
||||
IF (TD_ARM_32)
|
||||
SET(TD_BUILD_HTTP TRUE)
|
||||
ELSE ()
|
||||
SET(TD_BUILD_HTTP TRUE)
|
||||
ENDIF ()
|
||||
ELSEIF (TD_DARWIN)
|
||||
SET(TD_BUILD_HTTP TRUE)
|
||||
ELSE ()
|
||||
SET(TD_BUILD_HTTP TRUE)
|
||||
ENDIF ()
|
||||
ELSEIF (${BUILD_HTTP} MATCHES "false")
|
||||
SET(TD_BUILD_HTTP FALSE)
|
||||
ELSEIF (${BUILD_HTTP} MATCHES "true")
|
||||
SET(TD_BUILD_HTTP TRUE)
|
||||
ELSEIF (${BUILD_HTTP} MATCHES "internal")
|
||||
SET(TD_BUILD_HTTP FALSE)
|
||||
SET(TD_BUILD_TAOSA_INTERNAL TRUE)
|
||||
ELSE ()
|
||||
SET(TD_BUILD_HTTP TRUE)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_BUILD_HTTP)
|
||||
ADD_DEFINITIONS(-DHTTP_EMBEDDED)
|
||||
ENDIF ()
|
||||
|
||||
IF ("${BUILD_TOOLS}" STREQUAL "")
|
||||
IF (TD_LINUX)
|
||||
IF (TD_ARM_32)
|
||||
|
|
|
@ -52,6 +52,7 @@ extern "C" {
|
|||
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
|
||||
#define TSDB_PERFS_TABLE_TRANS "trans"
|
||||
#define TSDB_PERFS_TABLE_STREAMS "streams"
|
||||
#define TSDB_PERFS_TABLE_APPS "apps"
|
||||
|
||||
typedef struct SSysDbTableSchema {
|
||||
const char* name;
|
||||
|
|
|
@ -472,7 +472,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||
|
|
|
@ -44,6 +44,7 @@ static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
|
||||
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
|
||||
static void mndDestroySmaObj(SSmaObj *pSmaObj);
|
||||
|
||||
int32_t mndInitSma(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -390,7 +391,9 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
|||
taosRLockLatch(&pStb->lock);
|
||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
stbObj.numOfColumns = 0;
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.numOfTags = 0;
|
||||
stbObj.pTags = NULL;
|
||||
stbObj.updateTime = taosGetTimestampMs();
|
||||
stbObj.lock = 0;
|
||||
|
@ -404,6 +407,7 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
@ -442,6 +446,7 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||
SSmaObj *pSma) {
|
||||
|
@ -501,6 +506,13 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
|
||||
if (pSmaObj) {
|
||||
taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
|
||||
taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSmaObj smaObj = {0};
|
||||
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -524,29 +536,17 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
smaObj.tagsFilterLen = pCreate->tagsFilterLen;
|
||||
smaObj.sqlLen = pCreate->sqlLen;
|
||||
smaObj.astLen = pCreate->astLen;
|
||||
|
||||
if (smaObj.exprLen > 0) {
|
||||
smaObj.expr = taosMemoryMalloc(smaObj.exprLen);
|
||||
if (smaObj.expr == NULL) goto _OVER;
|
||||
memcpy(smaObj.expr, pCreate->expr, smaObj.exprLen);
|
||||
smaObj.expr = pCreate->expr;
|
||||
}
|
||||
|
||||
if (smaObj.tagsFilterLen > 0) {
|
||||
smaObj.tagsFilter = taosMemoryMalloc(smaObj.tagsFilterLen);
|
||||
if (smaObj.tagsFilter == NULL) goto _OVER;
|
||||
memcpy(smaObj.tagsFilter, pCreate->tagsFilter, smaObj.tagsFilterLen);
|
||||
smaObj.tagsFilter = pCreate->tagsFilter;
|
||||
}
|
||||
|
||||
if (smaObj.sqlLen > 0) {
|
||||
smaObj.sql = taosMemoryMalloc(smaObj.sqlLen);
|
||||
if (smaObj.sql == NULL) goto _OVER;
|
||||
memcpy(smaObj.sql, pCreate->sql, smaObj.sqlLen);
|
||||
smaObj.sql = pCreate->sql;
|
||||
}
|
||||
|
||||
if (smaObj.astLen > 0) {
|
||||
smaObj.ast = taosMemoryMalloc(smaObj.astLen);
|
||||
if (smaObj.ast == NULL) goto _OVER;
|
||||
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
|
||||
smaObj.ast = pCreate->ast;
|
||||
}
|
||||
|
||||
SStreamObj streamObj = {0};
|
||||
|
@ -581,7 +581,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
@ -589,6 +589,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndDestroySmaObj(&smaObj);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
|
@ -735,6 +736,7 @@ static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVg
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
@ -775,6 +777,7 @@ static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
|
||||
|
@ -825,7 +828,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
|
||||
// if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
|
@ -855,7 +858,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
|
||||
// if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
pVgroup = NULL;
|
||||
}
|
||||
|
@ -1012,7 +1015,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
|
|||
rsp->suid = pStb->uid;
|
||||
rsp->version = pStb->smaVer;
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
|
|
|
@ -323,10 +323,14 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
pOld->smaVer = pNew->smaVer;
|
||||
pOld->nextColId = pNew->nextColId;
|
||||
pOld->ttl = pNew->ttl;
|
||||
pOld->numOfColumns = pNew->numOfColumns;
|
||||
pOld->numOfTags = pNew->numOfTags;
|
||||
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
|
||||
memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
|
||||
if (pNew->numOfColumns > 0) {
|
||||
pOld->numOfColumns = pNew->numOfColumns;
|
||||
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
|
||||
}
|
||||
if (pNew->numOfTags > 0) {
|
||||
pOld->numOfTags = pNew->numOfTags;
|
||||
memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
|
||||
}
|
||||
if (pNew->commentLen != 0) {
|
||||
memcpy(pOld->comment, pNew->comment, pNew->commentLen);
|
||||
}
|
||||
|
|
|
@ -1347,13 +1347,11 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
STransAction *pAction = taosArrayGet(pArray, i);
|
||||
if (pAction->errCode != 0) {
|
||||
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
|
||||
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
|
||||
pAction->msgSent = 1;
|
||||
pAction->msgReceived = 1;
|
||||
pAction->errCode = 0;
|
||||
}
|
||||
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
|
||||
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
|
||||
pAction->msgSent = 1;
|
||||
pAction->msgReceived = 1;
|
||||
pAction->errCode = 0;
|
||||
}
|
||||
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
|
|
|
@ -940,7 +940,7 @@ static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbOb
|
|||
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_DND_DROP_VNODE;
|
||||
action.msgType = TDMT_SYNC_SET_VNODE_STANDBY;
|
||||
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
|
||||
if (isRedo) {
|
||||
|
|
|
@ -66,7 +66,13 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
return syncReconfig(pVnode->sync, &cfg);
|
||||
SRpcMsg rpcMsg = {.info = pMsg->info};
|
||||
if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
|
||||
vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return syncPropose(pVnode->sync, &rpcMsg, false);
|
||||
}
|
||||
|
||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
|
@ -241,6 +247,30 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
|||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
pFsm->data = pVnode;
|
||||
|
@ -250,6 +280,14 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinishCb = NULL;
|
||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||
|
||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
||||
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
|
||||
pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
|
||||
pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
|
||||
pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
|
||||
|
||||
return pFsm;
|
||||
}
|
||||
|
||||
|
|
|
@ -261,54 +261,48 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
|
||||
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
|
||||
if (dbNum > 0) {
|
||||
if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) {
|
||||
SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (NULL == pDb) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
char* dbFName = taosHashIterate(pDb, NULL);
|
||||
while (dbFName) {
|
||||
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
||||
dbFName = taosHashIterate(pDb, dbFName);
|
||||
}
|
||||
|
||||
taosHashCleanup(pDb);
|
||||
} else {
|
||||
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
||||
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
||||
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
||||
}
|
||||
}
|
||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
|
||||
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (NULL == pDb) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(name, dbFName);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(name, dbFName);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
char* dbFName = taosHashIterate(pDb, NULL);
|
||||
while (dbFName) {
|
||||
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
||||
dbFName = taosHashIterate(pDb, dbFName);
|
||||
}
|
||||
|
||||
taosHashCleanup(pDb);
|
||||
|
||||
int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum;
|
||||
if (tbNum > 0) {
|
||||
|
@ -404,7 +398,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
|||
}
|
||||
|
||||
if (pReq->forceUpdate) {
|
||||
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq));
|
||||
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq));
|
||||
}
|
||||
|
||||
int32_t taskIdx = 0;
|
||||
|
@ -790,8 +784,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
|
@ -870,7 +863,7 @@ _return:
|
|||
|
||||
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
|
||||
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
|
||||
SArray* pInfo = NULL;
|
||||
|
@ -949,7 +942,6 @@ _return:
|
|||
|
||||
int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
bool pass = false;
|
||||
|
@ -1018,7 +1010,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
|
|||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
||||
if (dbCache) {
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
|
||||
|
||||
|
@ -1067,6 +1059,9 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
|
|||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
||||
if (NULL != dbCache) {
|
||||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
|
||||
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
dbCache = NULL;
|
||||
|
||||
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
|
||||
} else {
|
||||
|
@ -1101,6 +1096,9 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
|
|||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
|
||||
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
dbCache = NULL;
|
||||
|
||||
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
|
||||
} else {
|
||||
|
@ -1176,6 +1174,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
|
|||
pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
|
||||
pInfo->dbId = dbCache->dbId;
|
||||
pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
|
||||
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
dbCache = NULL;
|
||||
} else {
|
||||
pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "catalogInt.h"
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
SCtgDebug gCTGDebug = {0};
|
||||
SCtgDebug gCTGDebug = {.lockEnable = true, .apiEnable = true};
|
||||
|
||||
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
|
||||
ASSERT(*(int32_t*)param == 1);
|
||||
|
|
|
@ -675,7 +675,8 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
|
|||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STableIndexInfo *pInfo = taosArrayGet(pIndex, i);
|
||||
taosArrayPush(*pRes, pInfo);
|
||||
pInfo = taosArrayPush(*pRes, pInfo);
|
||||
pInfo->expr = strdup(pInfo->expr);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -391,7 +391,9 @@ typedef struct SStreamBlockScanInfo {
|
|||
void* streamBlockReader;// stream block reader handle
|
||||
SArray* pColMatchInfo; //
|
||||
SNode* pCondition;
|
||||
int32_t tsArrayIndex;
|
||||
SArray* tsArray;
|
||||
uint64_t groupId;
|
||||
SUpdateInfo* pUpdateInfo;
|
||||
|
||||
SExprInfo* pPseudoExpr;
|
||||
|
@ -582,6 +584,7 @@ typedef struct SPartitionOperatorInfo {
|
|||
int32_t* columnOffset; // start position for each column data
|
||||
void* pGroupIter; // group iterator
|
||||
int32_t pageIndex; // page index of current group
|
||||
SSDataBlock* pUpdateRes;
|
||||
} SPartitionOperatorInfo;
|
||||
|
||||
typedef struct SWindowRowsSup {
|
||||
|
@ -907,6 +910,7 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
|||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -750,16 +750,103 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRowId) {
|
||||
for (int32_t j = 0; j < source->info.numOfCols; j++) {
|
||||
SColumnInfoData* pDestCol = (SColumnInfoData*)taosArrayGet(dest->pDataBlock, j);
|
||||
SColumnInfoData* pSourceCol = (SColumnInfoData*)taosArrayGet(source->pDataBlock, j);
|
||||
if (colDataIsNull_s(pSourceCol, sourceRowId)) {
|
||||
colDataAppendNULL(pDestCol, dest->info.rows);
|
||||
} else {
|
||||
colDataAppend(pDestCol, dest->info.rows, colDataGetData(pSourceCol, sourceRowId), false);
|
||||
}
|
||||
}
|
||||
dest->info.rows++;
|
||||
}
|
||||
|
||||
static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) {
|
||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
if (groupId) {
|
||||
return *groupId;
|
||||
}
|
||||
return 0;
|
||||
/* Todo(liuyao) for partition by column
|
||||
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
|
||||
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
|
||||
uint64_t resId = 0;
|
||||
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
|
||||
if (groupId) {
|
||||
return *groupId;
|
||||
} else if (len != 0) {
|
||||
resId = calcGroupId(pTableScanInfo->keyBuf, len);
|
||||
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
|
||||
}
|
||||
return resId;
|
||||
*/
|
||||
}
|
||||
|
||||
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
|
||||
SSDataBlock* pResult = NULL;
|
||||
pResult = doTableScan(pInfo->pOperatorDumy);
|
||||
if (pResult == NULL) {
|
||||
if (prepareDataScan(pInfo)) {
|
||||
// scan next window data
|
||||
pResult = doTableScan(pInfo->pOperatorDumy);
|
||||
while (1) {
|
||||
SSDataBlock* pResult = NULL;
|
||||
pResult = doTableScan(pInfo->pOperatorDumy);
|
||||
if (pResult == NULL) {
|
||||
if (prepareDataScan(pInfo)) {
|
||||
// scan next window data
|
||||
pResult = doTableScan(pInfo->pOperatorDumy);
|
||||
}
|
||||
}
|
||||
if (!pResult) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pResult->info.groupId == pInfo->groupId) {
|
||||
return pResult;
|
||||
}
|
||||
}
|
||||
|
||||
/* Todo(liuyao) for partition by column
|
||||
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
|
||||
blockDataCleanup(pResult);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
|
||||
if (id == pInfo->groupId) {
|
||||
copyOneRow(pResult, pBlock, i);
|
||||
}
|
||||
}
|
||||
return pResult;
|
||||
*/
|
||||
}
|
||||
|
||||
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
||||
blockDataCleanup(pUpdateBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->tsArray);
|
||||
if (pInfo->tsArrayIndex < size) {
|
||||
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
blockDataEnsureCapacity(pUpdateBlock, size);
|
||||
ASSERT(pBlock->info.numOfCols == pUpdateBlock->info.numOfCols);
|
||||
|
||||
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
||||
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
||||
int32_t i = 0;
|
||||
for ( ; i < size; i++) {
|
||||
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
|
||||
if (pInfo->groupId != id) {
|
||||
break;
|
||||
}
|
||||
copyOneRow(pUpdateBlock, pBlock, rowId);
|
||||
}
|
||||
pUpdateBlock->info.rows = i;
|
||||
pInfo->tsArrayIndex += i;
|
||||
pUpdateBlock->info.groupId = pInfo->groupId;
|
||||
pUpdateBlock->info.type = STREAM_REPROCESS;
|
||||
blockDataUpdateTsWindow(pUpdateBlock, 0);
|
||||
}
|
||||
// all rows have same group id
|
||||
ASSERT(pInfo->tsArrayIndex >= size);
|
||||
if (size > 0 && pInfo->tsArrayIndex == size) {
|
||||
taosArrayClear(pInfo->tsArray);
|
||||
}
|
||||
}
|
||||
|
||||
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
|
||||
|
@ -767,41 +854,21 @@ static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSD
|
|||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[i])) {
|
||||
taosArrayPush(pInfo->tsArray, ts + i);
|
||||
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
||||
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) {
|
||||
taosArrayPush(pInfo->tsArray, &rowId);
|
||||
}
|
||||
}
|
||||
if (!pUpdateBlock) {
|
||||
taosArrayClear(pInfo->tsArray);
|
||||
return;
|
||||
}
|
||||
int32_t size = taosArrayGetSize(pInfo->tsArray);
|
||||
if (size > 0 && invertible) {
|
||||
// Todo(liuyao) get from tsdb
|
||||
// SSDataBlock* p = createOneDataBlock(pBlock, true);
|
||||
// p->info.type = STREAM_INVERT;
|
||||
// taosArrayClear(pInfo->tsArray);
|
||||
// return p;
|
||||
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
blockDataEnsureCapacity(pUpdateBlock, size);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->tsArray, i);
|
||||
colDataAppend(pCol, i, (char*)pTs, false);
|
||||
}
|
||||
for (int32_t i = 0; i < pUpdateBlock->info.numOfCols; i++) {
|
||||
if (i == pInfo->primaryTsIndex) {
|
||||
continue;
|
||||
}
|
||||
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pUpdateBlock->pDataBlock, i);
|
||||
colDataAppendNNULL(pCol, 0, size);
|
||||
}
|
||||
pUpdateBlock->info.rows = size;
|
||||
pUpdateBlock->info.type = STREAM_REPROCESS;
|
||||
blockDataUpdateTsWindow(pUpdateBlock, 0);
|
||||
taosArrayClear(pInfo->tsArray);
|
||||
}
|
||||
setUpdateData(pInfo, pBlock, pUpdateBlock);
|
||||
// Todo(liuyao) get from tsdb
|
||||
// SSDataBlock* p = createOneDataBlock(pBlock, true);
|
||||
// p->info.type = STREAM_INVERT;
|
||||
// taosArrayClear(pInfo->tsArray);
|
||||
// return p;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||
|
@ -833,7 +900,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
return pInfo->pRes;
|
||||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
if (!isStateWindow(pInfo)) {
|
||||
prepareDataScan(pInfo);
|
||||
|
@ -848,7 +914,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
|
||||
SSDataBlock* pSDB = doDataScan(pInfo);
|
||||
if (pSDB == NULL) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
if (!isStateWindow(pInfo)) {
|
||||
prepareDataScan(pInfo);
|
||||
}
|
||||
return pInfo->pUpdateRes;
|
||||
} else {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
} else {
|
||||
getUpdateDataBlock(pInfo, true, pSDB, NULL);
|
||||
return pSDB;
|
||||
|
@ -941,7 +1015,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
if (rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
blockDataCleanup(pInfo->pUpdateRes);
|
||||
pInfo->tsArrayIndex = 0;
|
||||
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) {
|
||||
|
@ -1020,7 +1094,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->tsArray = taosArrayInit(4, sizeof(TSKEY));
|
||||
pInfo->tsArray = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pInfo->tsArray == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -1047,6 +1121,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
|||
pInfo->pOperatorDumy = pTableScanDummy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
pInfo->groupId = 0;
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
pOperator->blocking = false;
|
||||
|
|
|
@ -1985,7 +1985,7 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) {
|
|||
blockDataCleanup(pBlock);
|
||||
}
|
||||
|
||||
static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
|
||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
|
||||
ASSERT(pDest->info.capacity >= pSource->info.rows);
|
||||
clearUpdateDataBlock(pDest);
|
||||
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
|
||||
|
@ -1997,6 +1997,8 @@ static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_
|
|||
colDataAppendNNULL(pCol, 0, pSource->info.rows);
|
||||
}
|
||||
pDest->info.rows = pSource->info.rows;
|
||||
pDest->info.groupId = pSource->info.groupId;
|
||||
pDest->info.type = pSource->info.type;
|
||||
blockDataUpdateTsWindow(pDest, 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -2345,14 +2345,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
|||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||
SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||
ASSERT(pDBuf->algo == pSBuf->algo);
|
||||
if (pDBuf->algo == APERCT_ALGO_TDIGEST) {
|
||||
tdigestMerge(pDBuf->pTDigest, pSBuf->pTDigest);
|
||||
} else {
|
||||
SHistogramInfo* pTmp = tHistogramMerge(pDBuf->pHisto, pSBuf->pHisto, MAX_HISTOGRAM_BIN);
|
||||
memcpy(pDBuf->pHisto, pTmp, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||
pDBuf->pHisto->elems = (SHistBin*)((char*)pDBuf->pHisto + sizeof(SHistogramInfo));
|
||||
tHistogramDestroy(&pTmp);
|
||||
}
|
||||
apercentileTransferInfo(pSBuf, pDBuf);
|
||||
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -4128,12 +4128,14 @@ static const char* getSysDbName(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
return TSDB_INFORMATION_SCHEMA_DB;
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
case QUERY_NODE_SHOW_STREAMS_STMT:
|
||||
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
return TSDB_PERFORMANCE_SCHEMA_DB;
|
||||
default:
|
||||
break;
|
||||
|
@ -4183,6 +4185,10 @@ static const char* getSysTableName(ENodeType type) {
|
|||
return TSDB_PERFS_TABLE_TOPICS;
|
||||
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
|
||||
return TSDB_PERFS_TABLE_TRANS;
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
return TSDB_INS_TABLE_CONFIGS;
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
return TSDB_PERFS_TABLE_APPS;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -5237,6 +5243,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_VARIABLE_STMT:
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
code = rewriteShow(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
|
|
|
@ -104,10 +104,14 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
if (NULL == pNode->pParent || (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) &&
|
||||
QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
|
||||
QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent) &&
|
||||
QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode->pParent))) {
|
||||
return false;
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) ||
|
||||
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) &&
|
||||
pNode->pParent->pParent &&
|
||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) {
|
||||
return true;
|
||||
}
|
||||
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
|
||||
|
@ -217,16 +221,22 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
|
|||
}
|
||||
|
||||
static void setScanWindowInfo(SScanLogicNode* pScan) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pScan->node.pParent)) {
|
||||
pScan->interval = ((SWindowLogicNode*)pScan->node.pParent)->interval;
|
||||
pScan->offset = ((SWindowLogicNode*)pScan->node.pParent)->offset;
|
||||
pScan->sliding = ((SWindowLogicNode*)pScan->node.pParent)->sliding;
|
||||
pScan->intervalUnit = ((SWindowLogicNode*)pScan->node.pParent)->intervalUnit;
|
||||
pScan->slidingUnit = ((SWindowLogicNode*)pScan->node.pParent)->slidingUnit;
|
||||
pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType;
|
||||
pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark;
|
||||
pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId;
|
||||
pScan->filesFactor = ((SWindowLogicNode*)pScan->node.pParent)->filesFactor;
|
||||
SLogicNode* pParent = pScan->node.pParent;
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) &&
|
||||
pParent->pParent &&
|
||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
|
||||
pParent = pParent->pParent;
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) {
|
||||
pScan->interval = ((SWindowLogicNode*)pParent)->interval;
|
||||
pScan->offset = ((SWindowLogicNode*)pParent)->offset;
|
||||
pScan->sliding = ((SWindowLogicNode*)pParent)->sliding;
|
||||
pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit;
|
||||
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
|
||||
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
|
||||
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
|
||||
pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pParent)->pTspk)->colId;
|
||||
pScan->filesFactor = ((SWindowLogicNode*)pParent)->filesFactor;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,5 +41,16 @@ fi
|
|||
cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file
|
||||
grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file
|
||||
|
||||
# tar source code for run.sh to use
|
||||
# if [ $ent -eq 0 ]; then
|
||||
# cd ../../../
|
||||
# rm -rf TDengine.tar.gz
|
||||
# tar --exclude=TDengine/debug --exclude=TDengine/sim --exclude=TDengine/release -czf TDengine.tar.gz TDengine taos-connector-python
|
||||
# else
|
||||
# cd ../../../../
|
||||
# rm -rf TDinternal.tar.gz
|
||||
# tar --exclude=TDinternal/debug --exclude=TDinternal/sim --exclude=TDinternal/community/debug --exclude=TDinternal/community/release --exclude=TDinternal/community/sim -czf TDinternal.tar.gz TDinternal taos-connector-python
|
||||
# fi
|
||||
|
||||
exit 0
|
||||
|
||||
|
|
|
@ -255,7 +255,6 @@ function run_thread() {
|
|||
$cmd # 2>/dev/null
|
||||
local case_info=`echo "$line"|cut -d, -f 3,4`
|
||||
local corefile=`ls $log_dir/${case_file}.coredump/`
|
||||
corefile=`find $log_dir/${case_file}.coredump/ -name "core.*"`
|
||||
echo -e "$case_info \e[31m failed\e[0m"
|
||||
echo "=========================log============================"
|
||||
cat $log_dir/$case_file.log
|
||||
|
@ -291,6 +290,19 @@ function run_thread() {
|
|||
fi
|
||||
cmd="$scpcmd:${remote_sim_tar} $log_dir/${case_file}.sim.tar.gz"
|
||||
$cmd
|
||||
# backup source code (disabled)
|
||||
source_tar_dir=$log_dir/TDengine_${hosts[index]}
|
||||
source_tar_file=TDengine.tar.gz
|
||||
if [ $ent -ne 0 ]; then
|
||||
source_tar_dir=$log_dir/TDinternal_${hosts[index]}
|
||||
source_tar_file=TDinternal.tar.gz
|
||||
fi
|
||||
mkdir $source_tar_dir 2>/dev/null
|
||||
if [ $? -eq 0 ]; then
|
||||
cmd="$scpcmd:${workdirs[index]}/$source_tar_file $source_tar_dir"
|
||||
# echo "$cmd"
|
||||
# $cmd
|
||||
fi
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@
|
|||
# ---- mnode
|
||||
./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic2.sim
|
||||
#./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic4.sim
|
||||
./test.sh -f tsim/mnode/basic5.sim
|
||||
|
||||
|
@ -132,7 +132,7 @@
|
|||
#./test.sh -f tsim/mnode/basic1.sim -m
|
||||
|
||||
# --- sma
|
||||
#./test.sh -f tsim/sma/tsmaCreateInsertData.sim
|
||||
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
|
||||
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
|
||||
# --- valgrind
|
||||
|
|
|
@ -163,26 +163,22 @@ endi
|
|||
print =============== step32: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
|
||||
print =============== step33: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
|
@ -191,12 +187,9 @@ endi
|
|||
print =============== step34: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
|
@ -205,15 +198,8 @@ endi
|
|||
print =============== step35: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
|
@ -242,8 +228,6 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
print =============== step38: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
|
@ -254,7 +238,6 @@ sql show d1.tables
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
|
||||
print =============== step39: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
|
|
|
@ -37,6 +37,14 @@ print =============== trigger stream to execute sma aggr task and insert sma dat
|
|||
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
|
||||
#===================================================================
|
||||
|
||||
print =============== show streams ================================
|
||||
sql show streams;
|
||||
print $data00 $data01 $data02
|
||||
|
||||
if $data00 != d1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== select * from ct1 from memory
|
||||
sql select * from ct1;
|
||||
print $data00 $data01
|
||||
|
|
|
@ -7,44 +7,28 @@ system sh/exec.sh -n dnode1 -s start
|
|||
system sh/exec.sh -n dnode2 -s start
|
||||
sql connect
|
||||
|
||||
print =============== show dnodes
|
||||
sql show dnodes;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show mnodes;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != leader then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create dnodes
|
||||
sql create dnode $hostname port 7200
|
||||
sleep 2000
|
||||
|
||||
sql show dnodes;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 2 then
|
||||
return -1
|
||||
if $data(1)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
|
||||
print =============== kill dnode2
|
||||
|
@ -68,7 +52,7 @@ if $data[0][0] != 7 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][2] != undoAction then
|
||||
if $data[0][2] != redoAction then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -80,14 +64,34 @@ sql_error create database d1 vgroups 2;
|
|||
|
||||
print =============== start dnode2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
sleep 3000
|
||||
|
||||
$x = 0
|
||||
step2:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create database d1 vgroups 2;
|
||||
sql_error create database d1 vgroups 2;
|
||||
|
||||
print =============== kill dnode2
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
|
@ -106,22 +110,31 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][0] != 9 then
|
||||
if $data[0][0] != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][2] != undoAction then
|
||||
if $data[0][2] != redoAction then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][3] != d2 then
|
||||
return -1
|
||||
endi
|
||||
return
|
||||
|
||||
sql show databases ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
print d2 ==> $data(d2)[19]
|
||||
if $data(d2)[19] != creating then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error create database d2 vgroups 2;
|
||||
|
||||
print =============== kill transaction
|
||||
sql kill transaction 9;
|
||||
sql kill transaction 8;
|
||||
sleep 2000
|
||||
|
||||
sql show transactions
|
||||
|
@ -131,7 +144,34 @@ endi
|
|||
|
||||
print =============== start dnode2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
sleep 3000
|
||||
|
||||
$x = 0
|
||||
step3:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step3
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step3
|
||||
endi
|
||||
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql drop database d2;
|
||||
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
|
@ -145,6 +185,5 @@ sql_error kill transaction 3;
|
|||
sql_error kill transaction 4;
|
||||
sql_error kill transaction 5;
|
||||
|
||||
return
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
|
@ -48,21 +48,21 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key
|
|||
#output = child.readline()
|
||||
#print (output.decode())
|
||||
if len(expectString) != 0:
|
||||
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=6)
|
||||
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
|
||||
else:
|
||||
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=6)
|
||||
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
|
||||
|
||||
if platform.system().lower() == 'windows':
|
||||
retResult = child.before
|
||||
else:
|
||||
retResult = child.before.decode()
|
||||
print("cmd return result:\n%s\n"%retResult)
|
||||
#print(child.after.decode())
|
||||
# print(child.after.decode())
|
||||
if i == 0:
|
||||
print ('taos login success! Here can run sql, taos> ')
|
||||
if len(sqlString) != 0:
|
||||
child.sendline (sqlString)
|
||||
w = child.expect(["Query OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=1)
|
||||
w = child.expect(["Query OK", taosExpect.TIMEOUT, taosExpect.EOF], timeout=10)
|
||||
if platform.system().lower() == 'windows':
|
||||
retResult = child.before
|
||||
else:
|
||||
|
|
|
@ -243,9 +243,8 @@ class TDTestCase:
|
|||
tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag2'!='beijing'")
|
||||
tdSql.checkRows(5)
|
||||
#open
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag2'=''")
|
||||
#tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag2'=''")
|
||||
tdSql.checkRows(2)
|
||||
#
|
||||
# # where json value is int
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'=5")
|
||||
|
@ -253,11 +252,10 @@ class TDTestCase:
|
|||
tdSql.checkData(0, 1, 2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'=10")
|
||||
tdSql.checkRows(0)
|
||||
# open
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'<54")
|
||||
#tdSql.checkRows(3)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'<=11")
|
||||
#tdSql.checkRows(3)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'<54")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'<=11")
|
||||
tdSql.checkRows(4)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'>4")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'>=5")
|
||||
|
@ -270,31 +268,28 @@ class TDTestCase:
|
|||
# # where json value is double
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'=1.232")
|
||||
tdSql.checkRows(1)
|
||||
# open
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'<1.232")
|
||||
#tdSql.checkRows(0)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'<=1.232")
|
||||
#tdSql.checkRows(1)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'<1.232")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'<=1.232")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'>1.23")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'>=1.232")
|
||||
tdSql.checkRows(3)
|
||||
# open
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'!=1.232")
|
||||
#tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'!=1.232")
|
||||
tdSql.checkRows(6)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'!=3.232")
|
||||
tdSql.checkRows(7)
|
||||
#tdSql.error("select * from jsons1 where jtag->'tag1'/0=3")
|
||||
#tdSql.error("select * from jsons1 where jtag->'tag1'/5=1")
|
||||
#
|
||||
# # where json value is bool
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'=true")
|
||||
# open
|
||||
#tdSql.checkRows(0)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'=true")
|
||||
tdSql.checkRows(0)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'=false")
|
||||
#tdSql.checkRows(1)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1'!=false")
|
||||
#tdSql.checkRows(0)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1'!=false")
|
||||
tdSql.checkRows(3)
|
||||
#tdSql.error("select * from jsons1 where jtag->'tag1'>false")
|
||||
#
|
||||
# # where json value is null
|
||||
|
@ -303,18 +298,17 @@ class TDTestCase:
|
|||
#tdSql.checkRows(1)
|
||||
#
|
||||
# # where json key is null
|
||||
# open
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag_no_exist'=3")
|
||||
#tdSql.checkRows(0)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag_no_exist'=3")
|
||||
tdSql.checkRows(0)
|
||||
#
|
||||
# # where json value is not exist
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1' is null")
|
||||
#tdSql.checkData(0, 0, 'jsons1_9')
|
||||
#tdSql.checkRows(1)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag4' is null")
|
||||
#tdSql.checkRows(9)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag3' is not null")
|
||||
#tdSql.checkRows(4)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1' is null")
|
||||
tdSql.checkData(0, 0, 'jsons1_9')
|
||||
tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag4' is null")
|
||||
tdSql.checkRows(9)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag3' is not null")
|
||||
tdSql.checkRows(3)
|
||||
#
|
||||
# # test contains
|
||||
tdSql.query("select * from jsons1 where jtag contains 'tag1'")
|
||||
|
@ -344,10 +338,10 @@ class TDTestCase:
|
|||
#
|
||||
#
|
||||
# # test with between and
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1' between 1 and 30")
|
||||
#tdSql.checkRows(3)
|
||||
#tdSql.query("select * from jsons1 where jtag->'tag1' between 'femail' and 'beijing'")
|
||||
#tdSql.checkRows(2)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1' between 1 and 30")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query("select * from jsons1 where jtag->'tag1' between 'femail' and 'beijing'")
|
||||
tdSql.checkRows(2)
|
||||
#
|
||||
# # test with tbname/normal column
|
||||
tdSql.query("select * from jsons1 where tbname = 'jsons1_1'")
|
||||
|
@ -362,6 +356,7 @@ class TDTestCase:
|
|||
#
|
||||
# # test where condition like
|
||||
# open
|
||||
# syntax error
|
||||
#tdSql.query("select *,tbname from jsons1 where jtag->'tag2' like 'bei%'")
|
||||
#tdSql.checkRows(2)
|
||||
#tdSql.query("select *,tbname from jsons1 where jtag->'tag1' like 'fe%' and jtag->'tag2' is not null")
|
||||
|
|
|
@ -230,7 +230,7 @@ class TDTestCase:
|
|||
|
||||
|
||||
|
||||
def five_dnode_three_mnode(self,dnodenumber):
|
||||
def five_dnode_three_mnode(self):
|
||||
tdSql.query("show dnodes;")
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
|
@ -260,7 +260,9 @@ class TDTestCase:
|
|||
dropcount =0
|
||||
while dropcount <= 10:
|
||||
for i in range(1,3):
|
||||
tdLog.debug("drop mnode on dnode %d"%(i+1))
|
||||
tdSql.execute("drop mnode on dnode %d"%(i+1))
|
||||
tdLog.debug("create mnode on dnode %d"%(i+1))
|
||||
tdSql.execute("create mnode on dnode %d"%(i+1))
|
||||
dropcount+=1
|
||||
self.check3mnode()
|
||||
|
@ -276,7 +278,7 @@ class TDTestCase:
|
|||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.buildcluster(5)
|
||||
self.five_dnode_three_mnode(5)
|
||||
self.five_dnode_three_mnode()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -145,6 +145,7 @@ class TDTestCase:
|
|||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode1off(self):
|
||||
tdSql.error("drop mnode on dnode 1;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -174,6 +175,7 @@ class TDTestCase:
|
|||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode2off(self):
|
||||
tdSql.error("drop mnode on dnode 2;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -201,6 +203,7 @@ class TDTestCase:
|
|||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode3off(self):
|
||||
tdSql.error("drop mnode on dnode 3;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -255,17 +258,17 @@ class TDTestCase:
|
|||
print(tdSql.queryResult)
|
||||
|
||||
tdLog.debug("stop and follower of mnode")
|
||||
# self.TDDnodes.stoptaosd(2)
|
||||
# self.check3mnode2off()
|
||||
# self.TDDnodes.starttaosd(2)
|
||||
self.TDDnodes.stoptaosd(2)
|
||||
self.check3mnode2off()
|
||||
self.TDDnodes.starttaosd(2)
|
||||
|
||||
# self.TDDnodes.stoptaosd(3)
|
||||
# self.check3mnode3off()
|
||||
# self.TDDnodes.starttaosd(2)
|
||||
self.TDDnodes.stoptaosd(3)
|
||||
self.check3mnode3off()
|
||||
self.TDDnodes.starttaosd(2)
|
||||
|
||||
# self.TDDnodes.stoptaosd(1)
|
||||
# self.check3mnode1off()
|
||||
# self.TDDnodes.starttaosd(1)
|
||||
self.TDDnodes.stoptaosd(1)
|
||||
self.check3mnode1off()
|
||||
self.TDDnodes.starttaosd(1)
|
||||
|
||||
# self.check3mnode()
|
||||
stopcount =0
|
||||
|
|
|
@ -13,6 +13,7 @@ import time
|
|||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading as thd
|
||||
class MyDnodes(TDDnodes):
|
||||
def __init__(self ,dnodes_lists):
|
||||
super(MyDnodes,self).__init__()
|
||||
|
@ -29,7 +30,6 @@ class TDTestCase:
|
|||
self.depoly_cluster(dnodenumber)
|
||||
self.master_dnode = self.TDDnodes.dnodes[0]
|
||||
self.host=self.master_dnode.cfgDict["fqdn"]
|
||||
|
||||
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
|
||||
tdSql.init(conn1.cursor())
|
||||
|
||||
|
@ -50,9 +50,9 @@ class TDTestCase:
|
|||
break
|
||||
return buildPath
|
||||
|
||||
def insert_data(self,count):
|
||||
def insert_data(self,countstart,countstop):
|
||||
# fisrt add data : db\stable\childtable\general table
|
||||
for couti in count:
|
||||
for couti in range(countstart,countstop):
|
||||
tdSql.execute("drop database if exists db%d" %couti)
|
||||
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
|
||||
tdSql.execute("use db%d" %couti)
|
||||
|
@ -138,14 +138,15 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode1off(self):
|
||||
tdSql.error("drop mnode on dnode 1;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -166,17 +167,18 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'offline')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode2off(self):
|
||||
tdSql.error("drop mnode on dnode 2;")
|
||||
count=0
|
||||
while count < 10:
|
||||
while count < 40:
|
||||
time.sleep(1)
|
||||
tdSql.query("show mnodes;")
|
||||
if tdSql.checkRows(3) :
|
||||
|
@ -191,17 +193,18 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'leader')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,2,'offline')
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,2,'follower')
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode3off(self):
|
||||
tdSql.error("drop mnode on dnode 3;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -218,13 +221,13 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'leader')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,2,'follower')
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,2,'offline')
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
|
@ -232,13 +235,13 @@ class TDTestCase:
|
|||
|
||||
def five_dnode_three_mnode(self,dnodenumber):
|
||||
tdSql.query("show dnodes;")
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(4,1,'chenhaoran02:6430')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
tdSql.checkData(0,4,'ready')
|
||||
tdSql.checkData(4,4,'ready')
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'leader')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
|
||||
|
@ -254,10 +257,27 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
# stop and follower of mnode
|
||||
|
||||
tdLog.debug("stop and follower of mnode")
|
||||
self.TDDnodes.stoptaosd(2)
|
||||
self.check3mnode2off()
|
||||
self.TDDnodes.starttaosd(2)
|
||||
|
||||
self.TDDnodes.stoptaosd(3)
|
||||
self.check3mnode3off()
|
||||
self.TDDnodes.starttaosd(3)
|
||||
|
||||
self.TDDnodes.stoptaosd(1)
|
||||
self.check3mnode1off()
|
||||
self.TDDnodes.starttaosd(1)
|
||||
|
||||
# self.check3mnode()
|
||||
stopcount =0
|
||||
while stopcount <= 2:
|
||||
for i in range(dnodenumber):
|
||||
threads = []
|
||||
threads.append(thd.Thread(target=self.insert_data, args=(i*2,i*2+2)))
|
||||
threads[0].start()
|
||||
self.TDDnodes.stoptaosd(i+1)
|
||||
# if i == 1 :
|
||||
# self.check3mnode2off()
|
||||
|
@ -267,6 +287,8 @@ class TDTestCase:
|
|||
# self.check3mnode1off()
|
||||
|
||||
self.TDDnodes.starttaosd(i+1)
|
||||
threads[0].join()
|
||||
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
self.check3mnode()
|
||||
|
|
|
@ -358,8 +358,8 @@ class TDTestCase:
|
|||
tdSql.error("alter table %s.%s modify column c2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s modify tag t2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s set tag t1 10"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t2 '20'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -370,9 +370,9 @@ class TDTestCase:
|
|||
tdSql.query("alter table %s.%s modify column c4 binary(60)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s set tag t3 30"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s set tag t4 '40'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s set tag t5 '50'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.query("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename column c4 c4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -395,7 +395,7 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db1', \
|
||||
'dbName': 'db2', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
|
@ -407,8 +407,8 @@ class TDTestCase:
|
|||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
# tdLog.info("create database, super table, child table, normal table")
|
||||
# self.create_database(tdSql, parameterDict["dbName"])
|
||||
tdLog.info("create database, super table, child table, normal table")
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
ntbName = 'ntb2'
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict["stbName"]))
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10))"%(parameterDict["dbName"],ntbName))
|
||||
|
@ -449,10 +449,10 @@ class TDTestCase:
|
|||
tdSql.query("alter table %s.%s modify column c5 nchar(60)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t5 nchar(60)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t5 t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s drop column c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s add column c5 int"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -508,10 +508,10 @@ class TDTestCase:
|
|||
tdSql.error("alter table %s.%s modify tag t2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s modify tag t4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s set tag t1 11"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t2 '22'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t3 33"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t4 '44'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -526,7 +526,7 @@ class TDTestCase:
|
|||
tdSql.query("alter table %s.%s modify column c5 nchar(60)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t5 nchar(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s set tag t5 '50'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s set tag t5='50'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.query("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t5 t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -543,7 +543,7 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix("======== test case 3: ")
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db1', \
|
||||
'dbName': 'db3', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
|
@ -555,7 +555,8 @@ class TDTestCase:
|
|||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
# tdLog.info("create database, super table, child table, normal table")
|
||||
tdLog.info("create database, super table, child table, normal table")
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
ntbName = 'ntb3'
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict["stbName"]))
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10))"%(parameterDict["dbName"],ntbName))
|
||||
|
@ -627,7 +628,7 @@ class TDTestCase:
|
|||
parameterDict['stbName'] = 'stb31'
|
||||
ctbName = 'stb31_0'
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict['stbName']))
|
||||
tdSql.query("create table %s.%s using %s.%s tags (10, 100, '1000')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName']))
|
||||
tdSql.query("create table %s.%s using %s.%s tags (10, '10', 10, '10', '10')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName']))
|
||||
|
||||
tdLog.info("create topics from child table")
|
||||
columnTopicFromCtb = 'column_topic_from_ctb3'
|
||||
|
@ -653,11 +654,11 @@ class TDTestCase:
|
|||
tdSql.error("alter table %s.%s modify tag t4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s modify tag t5 nchar(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s set tag t1 10"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t2 '20'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t3 30"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t4 '40'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t5 '50'"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -676,6 +677,148 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||
|
||||
def tmqCase4(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 4: ")
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db4', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb4', \
|
||||
'ctbPrefix': 'stb4', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 23, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
ctbName = 'stb4_0'
|
||||
|
||||
tdLog.info("create database, super table, child table, normal table")
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict["stbName"]))
|
||||
tdSql.query("create table %s.%s using %s.%s tags (10, '10', 10, '10', '10')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName']))
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
columnTopicFromStb = 'star_topic_from_stb4'
|
||||
|
||||
tdSql.execute("create topic %s as stable %s.%s" %(columnTopicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdLog.info("======== child table test:")
|
||||
tdSql.query("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdLog.info("======== super table test:")
|
||||
# all alter actions allow
|
||||
tdSql.query("alter table %s.%s add column c6 int"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s add tag t6 float"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s modify column c2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify column c4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify column c5 nchar(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t5 nchar(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c4 c4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t1 t1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t2 t2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t3 t3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t4 t4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t5 t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdLog.printNoPrefix("======== test case 4 end ...... ")
|
||||
|
||||
def tmqCase5(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 5: ")
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db5', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb5', \
|
||||
'ctbPrefix': 'stb5', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 23, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
ctbName = 'stb5_0'
|
||||
|
||||
tdLog.info("create database, super table, child table, normal table")
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
tdSql.query("create table %s.%s (ts timestamp, c1 int, c2 binary(32), c3 double, c4 binary(32), c5 nchar(10)) tags (t1 int, t2 binary(32), t3 double, t4 binary(32), t5 nchar(10))"%(parameterDict["dbName"],parameterDict["stbName"]))
|
||||
tdSql.query("create table %s.%s using %s.%s tags (10, '10', 10, '10', '10')"%(parameterDict["dbName"],ctbName,parameterDict["dbName"],parameterDict['stbName']))
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
columnTopicFromStb = 'star_topic_from_db5'
|
||||
|
||||
tdSql.execute("create topic %s as database %s" %(columnTopicFromStb, parameterDict['dbName']))
|
||||
|
||||
tdLog.info("======== child table test:")
|
||||
tdSql.query("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdLog.info("======== super table test:")
|
||||
# all alter actions allow
|
||||
tdSql.query("alter table %s.%s add column c6 int"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s add tag t6 float"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s modify column c2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify column c4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify column c5 nchar(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s modify tag t5 nchar(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c4 c4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t1 t1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t2 t2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t3 t3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t4 t4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t5 t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s drop column c1"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c2"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdLog.printNoPrefix("======== test case 5 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
|
@ -687,9 +830,11 @@ class TDTestCase:
|
|||
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
# self.tmqCase1(cfgPath, buildPath)
|
||||
# self.tmqCase2(cfgPath, buildPath)
|
||||
# self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase4(cfgPath, buildPath)
|
||||
self.tmqCase5(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -0,0 +1,315 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
|
||||
class actionType(Enum):
|
||||
CREATE_DATABASE = 0
|
||||
CREATE_STABLE = 1
|
||||
CREATE_CTABLE = 2
|
||||
INSERT_DATA = 3
|
||||
|
||||
class TDTestCase:
|
||||
hostname = socket.gethostname()
|
||||
#rpcDebugFlagVal = '143'
|
||||
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#print ("===================: ", updatecfgDict)
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files or "taosd.exe" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def newcur(self,cfg,host,port):
|
||||
user = "root"
|
||||
password = "taosdata"
|
||||
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
|
||||
cur=con.cursor()
|
||||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
# tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
# tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
tdSql.query(sql)
|
||||
|
||||
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||
resultList=[]
|
||||
while 1:
|
||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
if tdSql.getRows() == expectRows:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
if valgrind == 1:
|
||||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
|
||||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
def create_stable(self,tsql, dbName,stbName):
|
||||
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName))
|
||||
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
||||
return
|
||||
|
||||
def create_ctables(self,tsql, dbName,stbName,ctbNum):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
|
||||
if (i > 0) and (i%100 == 0):
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
if startTs == 0:
|
||||
t = time.time()
|
||||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||
rowsOfSql += 1
|
||||
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
rowsOfSql = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s_%d values " %(stbName,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
#end sql
|
||||
if sql != pre_insert:
|
||||
#print("insert sql:%s"%sql)
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
if parameterDict["actionType"] == actionType.CREATE_DATABASE:
|
||||
self.create_database(tsql, parameterDict["dbName"])
|
||||
elif parameterDict["actionType"] == actionType.CREATE_STABLE:
|
||||
self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
elif parameterDict["actionType"] == actionType.CREATE_CTABLE:
|
||||
self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
elif parameterDict["actionType"] == actionType.INSERT_DATA:
|
||||
self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
else:
|
||||
tdLog.exit("not support's action: ", parameterDict["actionType"])
|
||||
|
||||
return
|
||||
|
||||
def tmqCase1(self, cfgPath, buildPath):
|
||||
'''
|
||||
Leave a TMQ process. Stop taosd, delete the data directory, restart taosd,
|
||||
and restart a consumption process to complete a consumption
|
||||
'''
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db3', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 20000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
# expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
expectrowcnt = 90000000000
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 9000000 # Forever loop
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
time.sleep(3)
|
||||
tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================")
|
||||
tdDnodes.stop(1)
|
||||
# time.sleep(5)
|
||||
dataPath = buildPath + "/../sim/dnode1/data/*"
|
||||
shellCmd = 'rm -rf ' + dataPath
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
tdDnodes.start(1)
|
||||
time.sleep(2)
|
||||
|
||||
######### redo to consume
|
||||
self.initConsumerTable()
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
if not (totalConsumeRows == expectrowcnt):
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
os.system('pkill tmq_sim')
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
buildPath = self.getBuildPath()
|
||||
if (buildPath == ""):
|
||||
tdLog.exit("taosd not found!")
|
||||
else:
|
||||
tdLog.info("taosd found in %s" % buildPath)
|
||||
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -18,7 +18,7 @@ python3 ./test.py -f 0-others/fsync.py
|
|||
python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
|
||||
python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
|
||||
python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
|
||||
# python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||
# BUG python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
|
||||
python3 ./test.py -f 1-insert/alter_stable.py
|
||||
python3 ./test.py -f 1-insert/alter_table.py
|
||||
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
|
||||
|
@ -101,7 +101,9 @@ python3 ./test.py -f 2-query/tail.py
|
|||
|
||||
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
python3 ./test.py -f 6-cluster/5dnode2mnode.py
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
||||
|
||||
python3 ./test.py -f 7-tmq/basic5.py
|
||||
python3 ./test.py -f 7-tmq/subscribeDb.py
|
||||
|
@ -114,3 +116,5 @@ python3 ./test.py -f 7-tmq/subscribeStb2.py
|
|||
python3 ./test.py -f 7-tmq/subscribeStb3.py
|
||||
python3 ./test.py -f 7-tmq/subscribeStb4.py
|
||||
python3 ./test.py -f 7-tmq/db.py
|
||||
python3 ./test.py -f 7-tmq/tmqError.py
|
||||
python3 ./test.py -f 7-tmq/schema.py
|
||||
|
|
|
@ -9,3 +9,95 @@ IF (TD_TAOS_TOOLS)
|
|||
ENDIF ()
|
||||
|
||||
add_subdirectory(shell)
|
||||
IF (TD_BUILD_HTTP)
|
||||
MESSAGE("")
|
||||
MESSAGE("${Yellow} use original embedded httpd ${ColourReset}")
|
||||
MESSAGE("")
|
||||
# ADD_SUBDIRECTORY(http)
|
||||
ELSEIF(TD_BUILD_TAOSA_INTERNAL)
|
||||
MESSAGE("${Yellow} use taosa internal as httpd ${ColourReset}")
|
||||
ELSE ()
|
||||
MESSAGE("")
|
||||
MESSAGE("${Green} use taosadapter as httpd, platform is ${PLATFORM_ARCH_STR} ${ColourReset}")
|
||||
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND git rev-parse --abbrev-ref HEAD
|
||||
RESULT_VARIABLE result_taos_version
|
||||
OUTPUT_VARIABLE taos_version
|
||||
)
|
||||
|
||||
STRING(FIND ${taos_version} release is_release_branch)
|
||||
|
||||
IF ("${is_release_branch}" STREQUAL "0")
|
||||
STRING(SUBSTRING "${taos_version}" 12 -1 taos_version)
|
||||
STRING(STRIP "${taos_version}" taos_version)
|
||||
ELSE ()
|
||||
STRING(CONCAT taos_version "_branch_" "${taos_version}")
|
||||
STRING(STRIP "${taos_version}" taos_version)
|
||||
ENDIF ()
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND cd ${CMAKE_CURRENT_SOURCE_DIR}/taosadapter
|
||||
)
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND git rev-parse --short HEAD
|
||||
RESULT_VARIABLE commit_sha1
|
||||
OUTPUT_VARIABLE taosadapter_commit_sha1
|
||||
)
|
||||
IF ("${taosadapter_commit_sha1}" STREQUAL "")
|
||||
SET(taosadapter_commit_sha1 "unknown")
|
||||
ELSE ()
|
||||
STRING(SUBSTRING "${taosadapter_commit_sha1}" 0 7 taosadapter_commit_sha1)
|
||||
STRING(STRIP "${taosadapter_commit_sha1}" taosadapter_commit_sha1)
|
||||
ENDIF ()
|
||||
MESSAGE("${Green} taosAdapter will use ${taos_version} and commit ${taosadapter_commit_sha1} as version ${ColourReset}")
|
||||
EXECUTE_PROCESS(
|
||||
COMMAND cd ..
|
||||
)
|
||||
MESSAGE("CURRENT SOURCE DIR ${CMAKE_CURRENT_SOURCE_DIR}")
|
||||
IF (TD_LINUX)
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(taosadapter
|
||||
PREFIX "taosadapter"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosadapter
|
||||
BUILD_ALWAYS off
|
||||
DEPENDS taos
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND cmake -E echo "taosadapter no need cmake to config"
|
||||
PATCH_COMMAND
|
||||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
||||
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
||||
INSTALL_COMMAND
|
||||
COMMAND curl -sL https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -o upx.tar.xz && tar -xvJf upx.tar.xz -C ${CMAKE_BINARY_DIR} --strip-components 1 > /dev/null && ${CMAKE_BINARY_DIR}/upx taosadapter || :
|
||||
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
|
||||
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
|
||||
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
|
||||
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
|
||||
)
|
||||
ELSEIF (TD_DARWIN)
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(taosadapter
|
||||
PREFIX "taosadapter"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosadapter
|
||||
BUILD_ALWAYS off
|
||||
DEPENDS taos
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND cmake -E echo "taosadapter no need cmake to config"
|
||||
PATCH_COMMAND
|
||||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
||||
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
|
||||
INSTALL_COMMAND
|
||||
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
|
||||
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
|
||||
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
|
||||
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
|
||||
)
|
||||
ELSE ()
|
||||
MESSAGE("${Yellow} Windows system still use original embedded httpd ${ColourReset}")
|
||||
ENDIF ()
|
||||
ENDIF ()
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 9ce3f5c98ef95d9c7c596c4ed7302b0ed69a92b2
|
Loading…
Reference in New Issue