Merge pull request #22962 from taosdata/feat/TD-26052
feat:vnode tranform support in tmq
This commit is contained in:
commit
8d5ea720e4
|
@ -1415,6 +1415,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
STqOffsetVal offsetNew = {0};
|
STqOffsetVal offsetNew = {0};
|
||||||
offsetNew.type = tmq->resetOffsetCfg;
|
offsetNew.type = tmq->resetOffsetCfg;
|
||||||
|
|
||||||
|
tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId, pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps,pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
|
||||||
|
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
.pollCnt = 0,
|
.pollCnt = 0,
|
||||||
.vgId = pVgEp->vgId,
|
.vgId = pVgEp->vgId,
|
||||||
|
|
|
@ -771,6 +771,29 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){
|
||||||
|
// iter all vnode to delete handle
|
||||||
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||||
|
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||||
|
pReq->head.vgId = htonl(pVgEp->vgId);
|
||||||
|
pReq->vgId = pVgEp->vgId;
|
||||||
|
pReq->consumerId = -1;
|
||||||
|
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = pVgEp->epSet;
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = sizeof(SMqVDeleteReq);
|
||||||
|
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SMDropCgroupReq dropReq = {0};
|
SMDropCgroupReq dropReq = {0};
|
||||||
|
@ -831,6 +854,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
||||||
|
|
||||||
|
code = sendDeleteSubToVnode(pSub, pTrans);
|
||||||
|
if (code != 0) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -1117,26 +1145,11 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
if (sendDeleteSubToVnode(pSub, pTrans) != 0) {
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
|
||||||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
|
||||||
pReq->head.vgId = htonl(pVgEp->vgId);
|
|
||||||
pReq->vgId = pVgEp->vgId;
|
|
||||||
pReq->consumerId = -1;
|
|
||||||
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
|
||||||
STransAction action = {0};
|
|
||||||
action.epSet = pVgEp->epSet;
|
|
||||||
action.pCont = pReq;
|
|
||||||
action.contLen = sizeof(SMqVDeleteReq);
|
|
||||||
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
|
|
|
@ -65,7 +65,8 @@ set(
|
||||||
"src/tq/tqSink.c"
|
"src/tq/tqSink.c"
|
||||||
"src/tq/tqCommit.c"
|
"src/tq/tqCommit.c"
|
||||||
"src/tq/tqStreamTask.c"
|
"src/tq/tqStreamTask.c"
|
||||||
"src/tq/tqSnapshot.c"
|
"src/tq/tqHandleSnapshot.c"
|
||||||
|
"src/tq/tqCheckInfoSnapshot.c"
|
||||||
"src/tq/tqOffsetSnapshot.c"
|
"src/tq/tqOffsetSnapshot.c"
|
||||||
"src/tq/tqStreamStateSnap.c"
|
"src/tq/tqStreamStateSnap.c"
|
||||||
"src/tq/tqStreamTaskSnap.c"
|
"src/tq/tqStreamTaskSnap.c"
|
||||||
|
|
|
@ -134,7 +134,7 @@ int32_t tqMetaOpen(STQ* pTq);
|
||||||
int32_t tqMetaClose(STQ* pTq);
|
int32_t tqMetaClose(STQ* pTq);
|
||||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
||||||
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq);
|
//int32_t tqMetaRestoreHandle(STQ* pTq);
|
||||||
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
||||||
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
||||||
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
||||||
|
|
|
@ -69,6 +69,8 @@ typedef struct STqSnapReader STqSnapReader;
|
||||||
typedef struct STqSnapWriter STqSnapWriter;
|
typedef struct STqSnapWriter STqSnapWriter;
|
||||||
typedef struct STqOffsetReader STqOffsetReader;
|
typedef struct STqOffsetReader STqOffsetReader;
|
||||||
typedef struct STqOffsetWriter STqOffsetWriter;
|
typedef struct STqOffsetWriter STqOffsetWriter;
|
||||||
|
typedef struct STqCheckInfoReader STqCheckInfoReader;
|
||||||
|
typedef struct STqCheckInfoWriter STqCheckInfoWriter;
|
||||||
typedef struct SStreamTaskReader SStreamTaskReader;
|
typedef struct SStreamTaskReader SStreamTaskReader;
|
||||||
typedef struct SStreamTaskWriter SStreamTaskWriter;
|
typedef struct SStreamTaskWriter SStreamTaskWriter;
|
||||||
typedef struct SStreamStateReader SStreamStateReader;
|
typedef struct SStreamStateReader SStreamStateReader;
|
||||||
|
@ -308,6 +310,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
|
||||||
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
|
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
|
||||||
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
|
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
|
||||||
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
|
// STqCheckInfoshotReader ==
|
||||||
|
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader);
|
||||||
|
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader);
|
||||||
|
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData);
|
||||||
|
// STqCheckInfoshotWriter ======================================
|
||||||
|
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter);
|
||||||
|
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback);
|
||||||
|
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||||
// STqOffsetReader ========================================
|
// STqOffsetReader ========================================
|
||||||
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
|
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
|
||||||
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
|
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
|
||||||
|
@ -503,6 +513,7 @@ enum {
|
||||||
SNAP_DATA_STREAM_TASK_CHECKPOINT = 10,
|
SNAP_DATA_STREAM_TASK_CHECKPOINT = 10,
|
||||||
SNAP_DATA_STREAM_STATE = 11,
|
SNAP_DATA_STREAM_STATE = 11,
|
||||||
SNAP_DATA_STREAM_STATE_BACKEND = 12,
|
SNAP_DATA_STREAM_STATE_BACKEND = 12,
|
||||||
|
SNAP_DATA_TQ_CHECKINFO = 13,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSnapDataHdr {
|
struct SSnapDataHdr {
|
||||||
|
|
|
@ -697,7 +697,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqDestroyTqHandle(&handle);
|
tqDestroyTqHandle(&handle);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
} else {
|
} else {
|
||||||
while(1){
|
while(1){
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
|
@ -0,0 +1,196 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "meta.h"
|
||||||
|
#include "tdbInt.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
|
// STqCheckInfoReader ========================================
|
||||||
|
struct STqCheckInfoReader {
|
||||||
|
STQ* pTq;
|
||||||
|
int64_t sver;
|
||||||
|
int64_t ever;
|
||||||
|
TBC* pCur;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqCheckInfoReader* pReader = NULL;
|
||||||
|
|
||||||
|
// alloc
|
||||||
|
pReader = (STqCheckInfoReader*)taosMemoryCalloc(1, sizeof(STqCheckInfoReader));
|
||||||
|
if (pReader == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pReader->pTq = pTq;
|
||||||
|
pReader->sver = sver;
|
||||||
|
pReader->ever = ever;
|
||||||
|
|
||||||
|
// impl
|
||||||
|
code = tdbTbcOpen(pTq->pCheckStore, &pReader->pCur, NULL);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tdbTbcMoveToFirst(pReader->pCur);
|
||||||
|
if (code) {
|
||||||
|
taosMemoryFree(pReader);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqInfo("vgId:%d, vnode checkinfo tq reader opened", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
|
*ppReader = pReader;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d, vnode checkinfo tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
*ppReader = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
tdbTbcClose((*ppReader)->pCur);
|
||||||
|
taosMemoryFree(*ppReader);
|
||||||
|
*ppReader = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
void* pKey = NULL;
|
||||||
|
void* pVal = NULL;
|
||||||
|
int32_t kLen = 0;
|
||||||
|
int32_t vLen = 0;
|
||||||
|
|
||||||
|
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||||
|
if (*ppData == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
|
pHdr->type = SNAP_DATA_TQ_CHECKINFO;
|
||||||
|
pHdr->size = vLen;
|
||||||
|
memcpy(pHdr->data, pVal, vLen);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
|
||||||
|
tqInfo("vgId:%d, vnode check info tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
|
||||||
|
tqError("vgId:%d, vnode check info tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// STqCheckInfoWriter ========================================
|
||||||
|
struct STqCheckInfoWriter {
|
||||||
|
STQ* pTq;
|
||||||
|
int64_t sver;
|
||||||
|
int64_t ever;
|
||||||
|
TXN* txn;
|
||||||
|
};
|
||||||
|
|
||||||
|
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqCheckInfoWriter* pWriter;
|
||||||
|
|
||||||
|
// alloc
|
||||||
|
pWriter = (STqCheckInfoWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
|
if (pWriter == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pWriter->pTq = pTq;
|
||||||
|
pWriter->sver = sver;
|
||||||
|
pWriter->ever = ever;
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||||
|
code = -1;
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppWriter = pWriter;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d, tq check info writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
*ppWriter = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STqCheckInfoWriter* pWriter = *ppWriter;
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
|
if (rollback) {
|
||||||
|
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
|
} else {
|
||||||
|
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
|
if (code) goto _err;
|
||||||
|
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
*ppWriter = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tqError("vgId:%d, tq check info writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
int32_t code = 0;
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
|
STqCheckInfo info = {0};
|
||||||
|
SDecoder decoder;
|
||||||
|
SDecoder* pDecoder = &decoder;
|
||||||
|
|
||||||
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
code = tDecodeSTqCheckInfo(pDecoder, &info);
|
||||||
|
if (code) goto _err;
|
||||||
|
code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
|
||||||
|
if (code) goto _err;
|
||||||
|
code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
if (code) goto _err;
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
|
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -75,31 +75,15 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
|
||||||
|
|
||||||
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
const void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
const void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t kLen = 0;
|
int32_t kLen = 0;
|
||||||
int32_t vLen = 0;
|
int32_t vLen = 0;
|
||||||
SDecoder decoder;
|
|
||||||
STqHandle handle;
|
|
||||||
|
|
||||||
*ppData = NULL;
|
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
||||||
for (;;) {
|
|
||||||
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
|
||||||
tDecodeSTqHandle(&decoder, &handle);
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
|
|
||||||
tdbTbcMoveToNext(pReader->pCur);
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
tdbTbcMoveToNext(pReader->pCur);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||||
if (*ppData == NULL) {
|
if (*ppData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -111,13 +95,15 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
pHdr->size = vLen;
|
pHdr->size = vLen;
|
||||||
memcpy(pHdr->data, pVal, vLen);
|
memcpy(pHdr->data, pVal, vLen);
|
||||||
|
|
||||||
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
|
||||||
handle.snapshotVer, handle.subKey, vLen);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -173,20 +159,13 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vgId = TD_VID(pWriter->pTq->pVnode);
|
|
||||||
|
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
|
|
||||||
// restore from metastore
|
|
||||||
if (tqMetaRestoreHandle(pTq) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, tq snapshot writer close failed since %s", vgId, tstrerror(code));
|
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,7 +179,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
code = tDecodeSTqHandle(pDecoder, &handle);
|
code = tDecodeSTqHandle(pDecoder, &handle);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
|
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
if (code < 0) goto _err;
|
if (code < 0) goto _err;
|
||||||
tDecoderClear(pDecoder);
|
tDecoderClear(pDecoder);
|
||||||
|
|
|
@ -388,34 +388,34 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
||||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
//int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
int code = 0;
|
// int code = 0;
|
||||||
TBC* pCur = NULL;
|
// TBC* pCur = NULL;
|
||||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
// if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||||
return -1;
|
// return -1;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
void* pKey = NULL;
|
// void* pKey = NULL;
|
||||||
int kLen = 0;
|
// int kLen = 0;
|
||||||
void* pVal = NULL;
|
// void* pVal = NULL;
|
||||||
int vLen = 0;
|
// int vLen = 0;
|
||||||
|
//
|
||||||
tdbTbcMoveToFirst(pCur);
|
// tdbTbcMoveToFirst(pCur);
|
||||||
|
//
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
// while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
STqHandle handle = {0};
|
// STqHandle handle = {0};
|
||||||
code = restoreHandle(pTq, pVal, vLen, &handle);
|
// code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||||
if (code < 0) {
|
// if (code < 0) {
|
||||||
tqDestroyTqHandle(&handle);
|
// tqDestroyTqHandle(&handle);
|
||||||
break;
|
// break;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
tdbFree(pKey);
|
// tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
// tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
// tdbTbcClose(pCur);
|
||||||
return code;
|
// return code;
|
||||||
}
|
//}
|
||||||
|
|
||||||
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
|
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
|
|
|
@ -159,6 +159,7 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosCloseFile(&pFile);
|
||||||
} else {
|
} else {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,8 @@ struct SVSnapReader {
|
||||||
STqSnapReader *pTqSnapReader;
|
STqSnapReader *pTqSnapReader;
|
||||||
int8_t tqOffsetDone;
|
int8_t tqOffsetDone;
|
||||||
STqOffsetReader *pTqOffsetReader;
|
STqOffsetReader *pTqOffsetReader;
|
||||||
|
int8_t tqCheckInfoDone;
|
||||||
|
STqCheckInfoReader *pTqCheckInfoReader;
|
||||||
// stream
|
// stream
|
||||||
int8_t streamTaskDone;
|
int8_t streamTaskDone;
|
||||||
SStreamTaskReader *pStreamTaskReader;
|
SStreamTaskReader *pStreamTaskReader;
|
||||||
|
@ -81,6 +83,18 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
|
||||||
metaSnapReaderClose(&pReader->pMetaReader);
|
metaSnapReaderClose(&pReader->pMetaReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pReader->pTqSnapReader) {
|
||||||
|
tqSnapReaderClose(&pReader->pTqSnapReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReader->pTqOffsetReader) {
|
||||||
|
tqOffsetReaderClose(&pReader->pTqOffsetReader);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReader->pTqCheckInfoReader) {
|
||||||
|
tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,6 +195,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TQ ================
|
// TQ ================
|
||||||
|
vInfo("vgId:%d tq transform start", vgId);
|
||||||
if (!pReader->tqHandleDone) {
|
if (!pReader->tqHandleDone) {
|
||||||
if (pReader->pTqSnapReader == NULL) {
|
if (pReader->pTqSnapReader == NULL) {
|
||||||
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader);
|
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader);
|
||||||
|
@ -200,6 +215,25 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!pReader->tqCheckInfoDone) {
|
||||||
|
if (pReader->pTqCheckInfoReader == NULL) {
|
||||||
|
code = tqCheckInfoReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqCheckInfoReader);
|
||||||
|
if (code < 0) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tqCheckInfoRead(pReader->pTqCheckInfoReader, ppData);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
} else {
|
||||||
|
if (*ppData) {
|
||||||
|
goto _exit;
|
||||||
|
} else {
|
||||||
|
pReader->tqCheckInfoDone = 1;
|
||||||
|
code = tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (!pReader->tqOffsetDone) {
|
if (!pReader->tqOffsetDone) {
|
||||||
if (pReader->pTqOffsetReader == NULL) {
|
if (pReader->pTqOffsetReader == NULL) {
|
||||||
code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader);
|
code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader);
|
||||||
|
@ -334,6 +368,7 @@ struct SVSnapWriter {
|
||||||
// tq
|
// tq
|
||||||
STqSnapWriter *pTqSnapWriter;
|
STqSnapWriter *pTqSnapWriter;
|
||||||
STqOffsetWriter *pTqOffsetWriter;
|
STqOffsetWriter *pTqOffsetWriter;
|
||||||
|
STqCheckInfoWriter *pTqCheckInfoWriter;
|
||||||
// stream
|
// stream
|
||||||
SStreamTaskWriter *pStreamTaskWriter;
|
SStreamTaskWriter *pStreamTaskWriter;
|
||||||
SStreamStateWriter *pStreamStateWriter;
|
SStreamStateWriter *pStreamStateWriter;
|
||||||
|
@ -411,6 +446,21 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pWriter->pTqSnapWriter) {
|
||||||
|
code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWriter->pTqCheckInfoWriter) {
|
||||||
|
code = tqCheckInfoWriterClose(&pWriter->pTqCheckInfoWriter, rollback);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWriter->pTqOffsetWriter) {
|
||||||
|
code = tqOffsetWriterClose(&pWriter->pTqOffsetWriter, rollback);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
if (pWriter->pStreamTaskWriter) {
|
if (pWriter->pStreamTaskWriter) {
|
||||||
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
|
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
@ -519,8 +569,34 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
} break;
|
} break;
|
||||||
case SNAP_DATA_TQ_HANDLE: {
|
case SNAP_DATA_TQ_HANDLE: {
|
||||||
|
// tq handle
|
||||||
|
if (pWriter->pTqSnapWriter == NULL) {
|
||||||
|
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tqSnapWrite(pWriter->pTqSnapWriter, pData, nData);
|
||||||
|
if (code) goto _err;
|
||||||
|
} break;
|
||||||
|
case SNAP_DATA_TQ_CHECKINFO: {
|
||||||
|
// tq checkinfo
|
||||||
|
if (pWriter->pTqCheckInfoWriter == NULL) {
|
||||||
|
code = tqCheckInfoWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqCheckInfoWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tqCheckInfoWrite(pWriter->pTqCheckInfoWriter, pData, nData);
|
||||||
|
if (code) goto _err;
|
||||||
} break;
|
} break;
|
||||||
case SNAP_DATA_TQ_OFFSET: {
|
case SNAP_DATA_TQ_OFFSET: {
|
||||||
|
// tq offset
|
||||||
|
if (pWriter->pTqOffsetWriter == NULL) {
|
||||||
|
code = tqOffsetWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqOffsetWriter);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tqOffsetSnapWrite(pWriter->pTqOffsetWriter, pData, nData);
|
||||||
|
if (code) goto _err;
|
||||||
} break;
|
} break;
|
||||||
case SNAP_DATA_STREAM_TASK:
|
case SNAP_DATA_STREAM_TASK:
|
||||||
case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
|
case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
|
||||||
|
|
|
@ -420,6 +420,7 @@ void transThreadOnce();
|
||||||
|
|
||||||
void transInit();
|
void transInit();
|
||||||
void transCleanup();
|
void transCleanup();
|
||||||
|
void transPrintEpSet(SEpSet* pEpSet);
|
||||||
|
|
||||||
void transFreeMsg(void* msg);
|
void transFreeMsg(void* msg);
|
||||||
int32_t transCompressMsg(char* msg, int32_t len);
|
int32_t transCompressMsg(char* msg, int32_t len);
|
||||||
|
|
|
@ -2221,7 +2221,9 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
||||||
tDebug("epset not equal, retry new epset");
|
tDebug("epset not equal, retry new epset1");
|
||||||
|
transPrintEpSet(&pCtx->epSet);
|
||||||
|
transPrintEpSet(&epSet);
|
||||||
epsetAssign(&pCtx->epSet, &epSet);
|
epsetAssign(&pCtx->epSet, &epSet);
|
||||||
noDelay = false;
|
noDelay = false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2246,7 +2248,9 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
|
||||||
tDebug("epset not equal, retry new epset");
|
tDebug("epset not equal, retry new epset2");
|
||||||
|
transPrintEpSet(&pCtx->epSet);
|
||||||
|
transPrintEpSet(&epSet);
|
||||||
epsetAssign(&pCtx->epSet, &epSet);
|
epsetAssign(&pCtx->epSet, &epSet);
|
||||||
noDelay = false;
|
noDelay = false;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -163,6 +163,8 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
|
||||||
|
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1
|
||||||
|
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeReplicate.py -M 3 -N 3 -n 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3404.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3404.py
|
||||||
|
|
|
@ -45,9 +45,9 @@ class TMQCom:
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
def initConsumerTable(self,cdbName='cdb'):
|
def initConsumerTable(self,cdbName='cdb', replicaVar=1):
|
||||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
tdSql.query("create database if not exists %s vgroups 1 replica %d"%(cdbName,replicaVar))
|
||||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||||
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
|
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
|
||||||
|
|
|
@ -12,7 +12,7 @@ sys.path.append("./7-tmq")
|
||||||
from tmqCommon import *
|
from tmqCommon import *
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
updatecfgDict = {'debugFlag': 135}
|
# updatecfgDict = {'debugFlag': 135}
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 2
|
self.vgroups = 2
|
||||||
|
@ -252,7 +252,6 @@ class TDTestCase:
|
||||||
break
|
break
|
||||||
|
|
||||||
tdLog.info("all consumers status into 'lost'")
|
tdLog.info("all consumers status into 'lost'")
|
||||||
|
|
||||||
# drop consumer groups
|
# drop consumer groups
|
||||||
tdLog.info("drop all consumers")
|
tdLog.info("drop all consumers")
|
||||||
for i in range(len(groupIdList)):
|
for i in range(len(groupIdList)):
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from util.cluster import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def __init__(self):
|
||||||
|
self.vgroups = 1
|
||||||
|
self.ctbNum = 10
|
||||||
|
self.rowsPerTbl = 10000
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
def prepareTestEnv(self):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 20,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
|
||||||
|
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctbn',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 20,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
|
||||||
|
topicNameList = ['topic1']
|
||||||
|
# expectRowsList = []
|
||||||
|
tmqCom.initConsumerTable("cdb", self.replicaVar)
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
# tdSql.query(queryString)
|
||||||
|
# expectRowsList.append(tdSql.getRows())
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||||
|
topicList = topicNameList[0]
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||||
|
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||||
|
|
||||||
|
tdSql.query("select * from information_schema.ins_vnodes")
|
||||||
|
# tdLog.debug(tdSql.queryResult)
|
||||||
|
tdDnodes = cluster.dnodes
|
||||||
|
for result in tdSql.queryResult:
|
||||||
|
if result[2] == 'dbt' and result[3] == 'leader':
|
||||||
|
tdLog.debug("leader is %d"%(result[0] - 1))
|
||||||
|
tdDnodes[result[0] - 1].stoptaosd()
|
||||||
|
break
|
||||||
|
|
||||||
|
pInsertThread.join()
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
if expectrowcnt > resultList[0]:
|
||||||
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, resultList[0]))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.tmqCase1()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,337 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
from util.cluster import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def __init__(self):
|
||||||
|
self.vgroups = 1
|
||||||
|
self.ctbNum = 10
|
||||||
|
self.rowsPerTbl = 10000
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
def getDataPath(self):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
return selfPath + '/../../../sim/dnode%d/data/vnode/vnode%d/wal/*';
|
||||||
|
|
||||||
|
def prepareTestEnv(self):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 60,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
# tdLog.info("create ctb")
|
||||||
|
# tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
# ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
# tdLog.info("insert data")
|
||||||
|
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
|
||||||
|
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||||
|
# tdDnodes.stop(1)
|
||||||
|
# tdDnodes.start(1)
|
||||||
|
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||||
|
return
|
||||||
|
|
||||||
|
def restartAndRemoveWal(self):
|
||||||
|
tdDnodes = cluster.dnodes
|
||||||
|
tdSql.query("select * from information_schema.ins_vnodes")
|
||||||
|
for result in tdSql.queryResult:
|
||||||
|
if result[2] == 'dbt':
|
||||||
|
tdLog.debug("dnode is %d"%(result[0]))
|
||||||
|
dnodeId = result[0]
|
||||||
|
vnodeId = result[1]
|
||||||
|
|
||||||
|
tdDnodes[dnodeId - 1].stoptaosd()
|
||||||
|
time.sleep(1)
|
||||||
|
dataPath = self.getDataPath()
|
||||||
|
dataPath = dataPath%(dnodeId,vnodeId)
|
||||||
|
os.system('rm -rf ' + dataPath)
|
||||||
|
tdLog.debug("dataPath:%s"%dataPath)
|
||||||
|
tdDnodes[dnodeId - 1].starttaosd()
|
||||||
|
time.sleep(1)
|
||||||
|
break
|
||||||
|
tdLog.debug("restart dnode ok")
|
||||||
|
|
||||||
|
def redistributeVgroups(self):
|
||||||
|
dnodesList = []
|
||||||
|
tdSql.query("show dnodes")
|
||||||
|
for result in tdSql.queryResult:
|
||||||
|
dnodesList.append(result[0])
|
||||||
|
|
||||||
|
tdSql.query("select * from information_schema.ins_vnodes")
|
||||||
|
vnodeId = 0
|
||||||
|
for result in tdSql.queryResult:
|
||||||
|
if result[2] == 'dbt':
|
||||||
|
tdLog.debug("dnode is %d"%(result[0]))
|
||||||
|
dnodesList.remove(result[0])
|
||||||
|
vnodeId = result[1]
|
||||||
|
break
|
||||||
|
redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0])
|
||||||
|
tdLog.debug("redistributeSql:%s"%(redistributeSql))
|
||||||
|
tdSql.query(redistributeSql)
|
||||||
|
tdLog.debug("redistributeSql ok")
|
||||||
|
|
||||||
|
def tmqCase1(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stb',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 60,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
topicNameList = ['topic1']
|
||||||
|
# expectRowsList = []
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
# tdSql.query(queryString)
|
||||||
|
# expectRowsList.append(tdSql.getRows())
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||||
|
topicList = topicNameList[0]
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
tdLog.info("create ctb1")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
|
||||||
|
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||||
|
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||||
|
|
||||||
|
#restart dnode & remove wal
|
||||||
|
self.restartAndRemoveWal()
|
||||||
|
|
||||||
|
# redistribute vgroup
|
||||||
|
self.redistributeVgroups();
|
||||||
|
|
||||||
|
tdLog.info("create ctb2")
|
||||||
|
paraDict['ctbPrefix'] = "ctbn"
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict)
|
||||||
|
pInsertThread.join()
|
||||||
|
pInsertThread1.join()
|
||||||
|
|
||||||
|
expectRows = 1
|
||||||
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
if expectrowcnt / 2 >= resultList[0]:
|
||||||
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase2(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 2: ")
|
||||||
|
paraDict = {'dbName':'dbt'}
|
||||||
|
|
||||||
|
ntbName = "ntb"
|
||||||
|
|
||||||
|
topicNameList = ['topic2']
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName)
|
||||||
|
tdLog.info("create nomal table sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
|
tdLog.info("create topics from nomal table")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName)
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||||
|
#restart dnode & remove wal
|
||||||
|
self.restartAndRemoveWal()
|
||||||
|
|
||||||
|
# redistribute vgroup
|
||||||
|
self.redistributeVgroups();
|
||||||
|
|
||||||
|
sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName)
|
||||||
|
tdLog.info("alter table sql: %s"%sqlString)
|
||||||
|
tdSql.error(sqlString)
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase3(self):
|
||||||
|
tdLog.printNoPrefix("======== test case 3: ")
|
||||||
|
paraDict = {'dbName': 'dbt',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'event': '',
|
||||||
|
'vgroups': 1,
|
||||||
|
'stbName': 'stbn',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||||
|
'ctbPrefix': 'ctb',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 10,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 10,
|
||||||
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
|
'pollDelay': 2,
|
||||||
|
'showMsg': 1,
|
||||||
|
'showRow': 1,
|
||||||
|
'snapshot': 0}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
topicNameList = ['topic3']
|
||||||
|
tmqCom.initConsumerTable()
|
||||||
|
|
||||||
|
tdLog.info("create stb")
|
||||||
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
|
|
||||||
|
tdLog.info("create ctb")
|
||||||
|
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
tdLog.info("insert data")
|
||||||
|
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||||
|
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||||
|
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb with filter")
|
||||||
|
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||||
|
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||||
|
tdLog.info("create topic sql: %s"%sqlString)
|
||||||
|
tdSql.execute(sqlString)
|
||||||
|
|
||||||
|
# init consume info, and start tmq_sim, then check consume result
|
||||||
|
tdLog.info("insert consume info to consume processor")
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||||
|
topicList = topicNameList[0]
|
||||||
|
ifcheckdata = 1
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
|
||||||
|
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
#restart dnode & remove wal
|
||||||
|
self.restartAndRemoveWal()
|
||||||
|
|
||||||
|
# redistribute vgroup
|
||||||
|
self.redistributeVgroups();
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||||
|
tdLog.info("wait the consume result")
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
for i in range(len(topicNameList)):
|
||||||
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
|
||||||
|
tdSql.prepare()
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.tmqCase1()
|
||||||
|
# self.tmqCase2()
|
||||||
|
# self.tmqCase3()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue