Merge remote-tracking branch 'origin/3.0' into feat/TD-27337
This commit is contained in:
commit
4700ebafd1
|
@ -393,7 +393,7 @@ pipeline {
|
||||||
agent{label " Mac_catalina "}
|
agent{label " Mac_catalina "}
|
||||||
steps {
|
steps {
|
||||||
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
|
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
|
||||||
timeout(time: 20, unit: 'MINUTES'){
|
timeout(time: 30, unit: 'MINUTES'){
|
||||||
pre_test()
|
pre_test()
|
||||||
pre_test_build_mac()
|
pre_test_build_mac()
|
||||||
}
|
}
|
||||||
|
|
|
@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo {
|
||||||
int32_t tickCount;
|
int32_t tickCount;
|
||||||
int32_t retryTimes;
|
int32_t retryTimes;
|
||||||
int32_t waitInterval;
|
int32_t waitInterval;
|
||||||
int64_t haltVer; // offset in wal when halt the stream task
|
int64_t haltVer; // offset in wal when halt the stream task
|
||||||
|
bool operatorOpen; // false by default
|
||||||
} SHistoryTaskInfo;
|
} SHistoryTaskInfo;
|
||||||
|
|
||||||
typedef struct STaskOutputInfo {
|
typedef struct STaskOutputInfo {
|
||||||
|
@ -852,7 +853,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||||
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
|
int64_t endTs, bool ready);
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWLock(SStreamMeta* pMeta);
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -305,7 +305,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
||||||
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
||||||
|
|
||||||
#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048
|
#define TSDB_SYNC_SNAP_BUFFER_SIZE 1024
|
||||||
|
|
||||||
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
||||||
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
||||||
|
|
|
@ -291,12 +291,17 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
|
// pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
|
||||||
if (pObj != NULL) {
|
// if (pObj != NULL) {
|
||||||
|
// terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
||||||
|
// goto _OVER;
|
||||||
|
// } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
|
||||||
|
// goto _OVER;
|
||||||
|
// }
|
||||||
|
|
||||||
|
if (sdbGetSize(pMnode->pSdb, SDB_SNODE) >= 1){
|
||||||
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
||||||
|
@ -314,7 +319,7 @@ _OVER:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseSnode(pMnode, pObj);
|
// mndReleaseSnode(pMnode, pObj);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
tFreeSMCreateQnodeReq(&createReq);
|
tFreeSMCreateQnodeReq(&createReq);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -363,35 +363,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
|
|
||||||
if (NULL == ast) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SNode * pAst = NULL;
|
|
||||||
int32_t code = nodesStringToNode(ast, &pAst);
|
|
||||||
|
|
||||||
SQueryPlan *pPlan = NULL;
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
SPlanContext cxt = {
|
|
||||||
.pAstRoot = pAst,
|
|
||||||
.topicQuery = false,
|
|
||||||
.streamQuery = true,
|
|
||||||
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
|
|
||||||
.watermark = watermark,
|
|
||||||
};
|
|
||||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL);
|
|
||||||
}
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
nodesDestroyNode((SNode *)pPlan);
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||||
SNode * pAst = NULL;
|
SNode * pAst = NULL;
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
|
@ -733,11 +704,20 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
if (pTask->info.nodeId == SNODE_HANDLE) {
|
if(pTask->info.nodeId == SNODE_HANDLE){
|
||||||
SSnodeObj *pObj = mndAcquireSnode(pMnode, pTask->info.nodeId);
|
SSnodeObj *pObj = NULL;
|
||||||
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
void *pIter = NULL;
|
||||||
} else {
|
while (1) {
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
}
|
||||||
|
}else{
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||||
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
|
|
|
@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
|
||||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
||||||
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||||
|
if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1
|
||||||
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
||||||
|
|
||||||
if (ASSERT(pHandle->msg != NULL)) {
|
if (ASSERT(pHandle->msg != NULL)) {
|
||||||
tqError("pHandle->msg should not be null");
|
tqError("pHandle->msg should not be null");
|
||||||
|
|
|
@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||||
pHandle->msg->contLen = pMsg->contLen;
|
pHandle->msg->contLen = pMsg->contLen;
|
||||||
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
||||||
tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
||||||
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||||
|
|
||||||
if (!isLeader) {
|
if (!isLeader) {
|
||||||
|
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||||
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
||||||
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
||||||
return code;
|
return code;
|
||||||
|
@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
|
||||||
streamLaunchFillHistoryTask(pTask);
|
streamLaunchFillHistoryTask(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||||
|
pTask->execInfo.start, true);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,6 +278,15 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int64_t tBlockDataSize(SBlockData* pBlockData) {
|
||||||
|
int64_t nData = 0;
|
||||||
|
for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
|
||||||
|
SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
|
||||||
|
nData += pColData->nData;
|
||||||
|
}
|
||||||
|
return nData;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
|
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -320,8 +329,11 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** dat
|
||||||
code = tsdbIterMergerNext(reader->dataIterMerger);
|
code = tsdbIterMergerNext(reader->dataIterMerger);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (reader->blockData->nRow >= 81920) {
|
if (!(reader->blockData->nRow % 16)) {
|
||||||
break;
|
int64_t nData = tBlockDataSize(reader->blockData);
|
||||||
|
if (nData >= 1 * 1024 * 1024) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
qSetStreamOpOpen(exec);
|
if (!pTask->hTaskInfo.operatorOpen) {
|
||||||
|
qSetStreamOpOpen(exec);
|
||||||
|
pTask->hTaskInfo.operatorOpen = true;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(pTask)) {
|
if (streamTaskShouldPause(pTask)) {
|
||||||
|
|
|
@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||||
pTask->id.idStr, numOfDowns, el, p);
|
pTask->id.idStr, numOfDowns, el, p);
|
||||||
|
|
||||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||||
|
pTask->execInfo.start, true);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
|
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||||
|
taosGetTimestampMs(), false);
|
||||||
|
|
||||||
// automatically set the related fill-history task to be failed.
|
// automatically set the related fill-history task to be failed.
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
|
|
||||||
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
||||||
streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false);
|
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
|
||||||
|
taosGetTimestampMs(), false);
|
||||||
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||||
}
|
}
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||||
|
@ -1066,15 +1069,13 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
|
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
int64_t endTs, bool ready) {
|
||||||
|
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||||
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
||||||
STaskId id = streamTaskExtractKey(pTask);
|
|
||||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
|
||||||
|
|
||||||
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
|
|
||||||
|
|
||||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||||
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||||
|
@ -1086,15 +1087,14 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
|
||||||
pStartInfo->readyTs = taosGetTimestampMs();
|
pStartInfo->readyTs = taosGetTimestampMs();
|
||||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||||
|
|
||||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64
|
||||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||||
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
|
pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs,
|
||||||
pStartInfo->elapsedTime / 1000.0);
|
pStartInfo->elapsedTime / 1000.0);
|
||||||
|
|
||||||
// print the initialization elapsed time and info
|
// print the initialization elapsed time and info
|
||||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||||
|
|
||||||
streamMetaResetStartInfo(pStartInfo);
|
streamMetaResetStartInfo(pStartInfo);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
||||||
|
|
|
@ -327,15 +327,19 @@ int32_t walEndSnapshot(SWal *pWal) {
|
||||||
|
|
||||||
// iterate files, until the searched result
|
// iterate files, until the searched result
|
||||||
// delete according to file size or close time
|
// delete according to file size or close time
|
||||||
|
SWalFileInfo *pUntil = NULL;
|
||||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||||
if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
|
if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
|
||||||
(pWal->cfg.retentionPeriod == 0 ||
|
(pWal->cfg.retentionPeriod == 0 ||
|
||||||
pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
|
pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
|
||||||
deleteCnt++;
|
|
||||||
newTotSize -= iter->fileSize;
|
newTotSize -= iter->fileSize;
|
||||||
taosArrayPush(pWal->toDeleteFiles, iter);
|
pUntil = iter;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) {
|
||||||
|
deleteCnt++;
|
||||||
|
taosArrayPush(pWal->toDeleteFiles, iter);
|
||||||
|
}
|
||||||
|
|
||||||
// make new array, remove files
|
// make new array, remove files
|
||||||
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
||||||
|
|
|
@ -247,7 +247,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists"
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_ALREADY_EXIST, "Qnode already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_ALREADY_EXIST, "Qnode already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_NOT_EXIST, "Qnode not there")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_NOT_EXIST, "Qnode not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists")
|
//TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode can only be created 1")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3")
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
,,y,unit-test,bash test.sh
|
,,y,unit-test,bash test.sh
|
||||||
|
|
||||||
#system test
|
#system test
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py
|
||||||
|
|
|
@ -113,11 +113,7 @@ sql_error drop snode on dnode 2
|
||||||
|
|
||||||
print =============== create drop snodes
|
print =============== create drop snodes
|
||||||
sql create snode on dnode 1
|
sql create snode on dnode 1
|
||||||
sql create snode on dnode 2
|
sql_error create snode on dnode 2
|
||||||
sql show snodes
|
|
||||||
if $rows != 2 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
print =============== restart
|
print =============== restart
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
@ -127,7 +123,7 @@ system sh/exec.sh -n dnode2 -s start
|
||||||
|
|
||||||
sleep 2000
|
sleep 2000
|
||||||
sql show snodes
|
sql show snodes
|
||||||
if $rows != 2 then
|
if $rows != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -49,7 +49,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -118,7 +118,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
|
|
@ -23,7 +23,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -49,7 +49,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -118,7 +118,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -51,7 +51,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -120,7 +120,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 120,
|
'pollDelay': 120,
|
||||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -51,7 +51,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -120,7 +120,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 120,
|
'pollDelay': 120,
|
||||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -53,7 +53,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -122,7 +122,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 120,
|
'pollDelay': 120,
|
||||||
|
@ -192,7 +192,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -51,7 +51,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -120,7 +120,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -190,7 +190,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -46,7 +46,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 30,
|
'pollDelay': 30,
|
||||||
|
@ -138,7 +138,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 10,
|
'pollDelay': 10,
|
||||||
|
@ -217,7 +217,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 10,
|
'pollDelay': 10,
|
||||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -46,7 +46,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -137,7 +137,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -207,7 +207,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -46,7 +46,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -137,7 +137,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -203,7 +203,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.common import *
|
||||||
|
from util.sqlset import *
|
||||||
|
from util.autogen import *
|
||||||
|
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
import os
|
||||||
|
from os import path
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
# init
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), True)
|
||||||
|
|
||||||
|
# autoGen
|
||||||
|
self.autoGen = AutoGen()
|
||||||
|
|
||||||
|
def waitTranslation(self, waitSeconds):
|
||||||
|
# wait end
|
||||||
|
for i in range(waitSeconds):
|
||||||
|
sql ="show transactions;"
|
||||||
|
rows = tdSql.query(sql)
|
||||||
|
if rows == 0:
|
||||||
|
return True
|
||||||
|
tdLog.info(f"i={i} wait for translation finish ...")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def getPath(self, tool="taosBenchmark"):
|
||||||
|
if (platform.system().lower() == 'windows'):
|
||||||
|
tool = tool + ".exe"
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("community")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
paths = []
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ((tool) in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
paths.append(os.path.join(root, tool))
|
||||||
|
break
|
||||||
|
if (len(paths) == 0):
|
||||||
|
tdLog.exit("taosBenchmark not found!")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
tdLog.info("taosBenchmark found in %s" % paths[0])
|
||||||
|
return paths[0]
|
||||||
|
|
||||||
|
def taosBenchmark(self, param):
|
||||||
|
binPath = self.getPath()
|
||||||
|
cmd = f"{binPath} {param}"
|
||||||
|
tdLog.info(cmd)
|
||||||
|
os.system(cmd)
|
||||||
|
|
||||||
|
# run
|
||||||
|
def run(self):
|
||||||
|
# gen data
|
||||||
|
random.seed(int(time.time()))
|
||||||
|
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
|
||||||
|
# create stream
|
||||||
|
tdSql.execute("use db")
|
||||||
|
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
|
||||||
|
sql = "select count(*) from sta"
|
||||||
|
# loop wait max 60s to check count is ok
|
||||||
|
tdLog.info("loop wait result ...")
|
||||||
|
tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5)
|
||||||
|
|
||||||
|
# check all data is correct
|
||||||
|
sql = "select * from sta where cnt != 20;"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
# check ts interval is correct
|
||||||
|
sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;"
|
||||||
|
tdSql.query(sql)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
# stop
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue