Merge pull request #28983 from taosdata/fix/liaohj
refactor: display the time window for force_window_close as the progress.
This commit is contained in:
commit
0829d2219c
|
@ -280,7 +280,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsCompactPullupInterval = 10;
|
int32_t tsCompactPullupInterval = 10;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointInterval = 60;
|
int32_t tsStreamCheckpointInterval = 300;
|
||||||
float tsSinkDataRate = 2.0;
|
float tsSinkDataRate = 2.0;
|
||||||
int32_t tsStreamNodeCheckInterval = 20;
|
int32_t tsStreamNodeCheckInterval = 20;
|
||||||
int32_t tsMaxConcurrentCheckpoint = 1;
|
int32_t tsMaxConcurrentCheckpoint = 1;
|
||||||
|
|
|
@ -178,7 +178,7 @@ int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId);
|
||||||
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId);
|
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId);
|
||||||
|
|
||||||
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);
|
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);
|
||||||
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows);
|
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t nRows, int32_t p);
|
||||||
|
|
||||||
int32_t mndProcessResetStatusReq(SRpcMsg *pReq);
|
int32_t mndProcessResetStatusReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
|
|
|
@ -362,18 +362,32 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addNewTaskList(SStreamObj* pStream) {
|
static int32_t addNewTaskList(SStreamObj* pStream) {
|
||||||
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
|
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
|
||||||
|
if (pTaskList == NULL) {
|
||||||
|
mError("failed init task list, code:%s", tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
|
if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
|
||||||
mError("failed to put into array");
|
mError("failed to put into array, code:%s", tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
pTaskList = taosArrayInit(0, POINTER_BYTES);
|
pTaskList = taosArrayInit(0, POINTER_BYTES);
|
||||||
|
if (pTaskList == NULL) {
|
||||||
|
mError("failed init task list, code:%s", tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
|
if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
|
||||||
mError("failed to put into array");
|
mError("failed to put into array, code:%s", tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the history task id
|
// set the history task id
|
||||||
|
@ -404,8 +418,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId,
|
mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, isHistoryTask);
|
||||||
isHistoryTask);
|
|
||||||
|
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
haltInitialTaskStatus(pTask, plan, isHistoryTask);
|
haltInitialTaskStatus(pTask, plan, isHistoryTask);
|
||||||
|
@ -455,13 +468,14 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
|
||||||
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
|
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
|
||||||
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
|
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
int32_t code = 0;
|
|
||||||
SSdb* pSdb = pMnode->pSdb;
|
SSdb* pSdb = pMnode->pSdb;
|
||||||
|
int32_t code = addNewTaskList(pStream);
|
||||||
addNewTaskList(pStream);
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SVgObj* pVgroup;
|
SVgObj* pVgroup = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -571,8 +585,10 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
|
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
|
||||||
int32_t code = 0;
|
int32_t code = addNewTaskList(pStream);
|
||||||
addNewTaskList(pStream);
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
|
code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
|
||||||
|
@ -677,8 +693,13 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
|
|
||||||
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
|
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
|
||||||
externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
|
externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
|
||||||
|
|
||||||
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
||||||
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
||||||
|
if (pStream->tasks == NULL || pStream->pHTasksList == NULL) {
|
||||||
|
mError("failed to create stream obj, code:%s", tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
|
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
|
||||||
// add extra sink
|
// add extra sink
|
||||||
|
@ -718,6 +739,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||||
float size = (float)taosArrayGetSize(*list);
|
float size = (float)taosArrayGetSize(*list);
|
||||||
|
@ -725,7 +747,10 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
if (cnt <= 1) break;
|
if (cnt <= 1) break;
|
||||||
|
|
||||||
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
|
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
|
||||||
addNewTaskList(pStream);
|
code = addNewTaskList(pStream);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
for (int j = 0; j < cnt; j++) {
|
for (int j = 0; j < cnt; j++) {
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset, false);
|
code = addAggTask(pStream, pMnode, plan, pEpset, false);
|
||||||
|
@ -751,7 +776,12 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
mDebug("doScheduleStream add final agg");
|
mDebug("doScheduleStream add final agg");
|
||||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||||
size_t size = taosArrayGetSize(*list);
|
size_t size = taosArrayGetSize(*list);
|
||||||
addNewTaskList(pStream);
|
|
||||||
|
code = addNewTaskList(pStream);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndVgroup.h"
|
|
||||||
#include "osMemory.h"
|
#include "osMemory.h"
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
@ -1610,6 +1609,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
|
SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
|
||||||
|
if (pSourceDb != NULL) {
|
||||||
|
precision = pSourceDb->cfg.precision;
|
||||||
|
mndReleaseDb(pMnode, pSourceDb);
|
||||||
|
}
|
||||||
|
|
||||||
// add row for each task
|
// add row for each task
|
||||||
SStreamTaskIter *pIter = NULL;
|
SStreamTaskIter *pIter = NULL;
|
||||||
code = createStreamTaskIter(pStream, &pIter);
|
code = createStreamTaskIter(pStream, &pIter);
|
||||||
|
@ -1628,7 +1634,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
|
code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1043,7 +1043,7 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) {
|
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows, int32_t precision) {
|
||||||
SColumnInfoData *pColInfo = NULL;
|
SColumnInfoData *pColInfo = NULL;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1103,14 +1103,11 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
|
||||||
// level
|
// level
|
||||||
char level[20 + VARSTR_HEADER_SIZE] = {0};
|
char level[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
memcpy(varDataVal(level), "source", 6);
|
STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
|
||||||
varDataSetLen(level, 6);
|
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
memcpy(varDataVal(level), "agg", 3);
|
STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
|
||||||
varDataSetLen(level, 3);
|
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
memcpy(varDataVal(level), "sink", 4);
|
STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
|
||||||
varDataSetLen(level, 4);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
@ -1234,10 +1231,17 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
const char *sinkStr = "%.2f MiB";
|
const char *sinkStr = "%.2f MiB";
|
||||||
snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
|
snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info
|
||||||
// offset info
|
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||||
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
int32_t ret = taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
|
||||||
snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
|
if (ret != 0) {
|
||||||
|
mError("failed to format processed timewindow, skey:%" PRId64, pe->processedVer);
|
||||||
|
memset(buf, 0, tListLen(buf));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
||||||
|
snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
memset(buf, 0, tListLen(buf));
|
memset(buf, 0, tListLen(buf));
|
||||||
}
|
}
|
||||||
|
|
|
@ -972,7 +972,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray**
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) {
|
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
|
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
|
||||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||||
|
@ -984,7 +984,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
|
code = doFindNotSendUpstream(pTask, pList, ppNotSendList);
|
||||||
if (code) {
|
if (code) {
|
||||||
streamCleanBeforeQuitTmr(pTmrInfo, param);
|
streamCleanBeforeQuitTmr(pTmrInfo, param);
|
||||||
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
|
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
|
||||||
|
@ -992,7 +992,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA
|
||||||
}
|
}
|
||||||
|
|
||||||
// do send retrieve checkpoint trigger msg to upstream
|
// do send retrieve checkpoint trigger msg to upstream
|
||||||
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList);
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -1064,7 +1064,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexLock(&pActiveInfo->lock);
|
streamMutexLock(&pActiveInfo->lock);
|
||||||
code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList);
|
code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList);
|
||||||
streamMutexUnlock(&pActiveInfo->lock);
|
streamMutexUnlock(&pActiveInfo->lock);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -75,6 +75,25 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setProcessProgress(SStreamTask* pTask, STaskStatusEntry* pEntry) {
|
||||||
|
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||||
|
pEntry->processedVer = pTask->status.latestForceWindow.skey;
|
||||||
|
} else {
|
||||||
|
if (pTask->exec.pWalReader != NULL) {
|
||||||
|
pEntry->processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1;
|
||||||
|
if (pEntry->processedVer < 0) {
|
||||||
|
pEntry->processedVer = pTask->chkInfo.processedVer;
|
||||||
|
}
|
||||||
|
|
||||||
|
walReaderValidVersionRange(pTask->exec.pWalReader, &pEntry->verRange.minVer, &pEntry->verRange.maxVer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
|
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
@ -209,16 +228,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pTask->exec.pWalReader != NULL) {
|
setProcessProgress(pTask, &entry);
|
||||||
entry.processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1;
|
|
||||||
if (entry.processedVer < 0) {
|
|
||||||
entry.processedVer = pTask->chkInfo.processedVer;
|
|
||||||
}
|
|
||||||
|
|
||||||
walReaderValidVersionRange(pTask->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
|
|
||||||
}
|
|
||||||
|
|
||||||
addUpdateNodeIntoHbMsg(pTask, pMsg);
|
addUpdateNodeIntoHbMsg(pTask, pMsg);
|
||||||
|
|
||||||
p = taosArrayPush(pMsg->pTaskStatus, &entry);
|
p = taosArrayPush(pMsg->pTaskStatus, &entry);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId);
|
stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId);
|
||||||
|
|
|
@ -39,7 +39,7 @@ class TDSimClient:
|
||||||
"rpcDebugFlag": "135",
|
"rpcDebugFlag": "135",
|
||||||
"tmrDebugFlag": "131",
|
"tmrDebugFlag": "131",
|
||||||
"dDebugFlag":"131",
|
"dDebugFlag":"131",
|
||||||
"cDebugFlag": "131",
|
"cDebugFlag": "135",
|
||||||
"uDebugFlag": "131",
|
"uDebugFlag": "131",
|
||||||
"jniDebugFlag": "131",
|
"jniDebugFlag": "131",
|
||||||
"qDebugFlag": "135",
|
"qDebugFlag": "135",
|
||||||
|
@ -136,7 +136,7 @@ class TDDnode:
|
||||||
"dDebugFlag": "131",
|
"dDebugFlag": "131",
|
||||||
"vDebugFlag": "131",
|
"vDebugFlag": "131",
|
||||||
"tqDebugFlag": "135",
|
"tqDebugFlag": "135",
|
||||||
"cDebugFlag": "131",
|
"cDebugFlag": "135",
|
||||||
"stDebugFlag": "135",
|
"stDebugFlag": "135",
|
||||||
"smaDebugFlag": "131",
|
"smaDebugFlag": "131",
|
||||||
"jniDebugFlag": "131",
|
"jniDebugFlag": "131",
|
||||||
|
|
|
@ -21,6 +21,7 @@ from util.cluster import *
|
||||||
import threading
|
import threading
|
||||||
# should be used by -N option
|
# should be used by -N option
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
|
updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'checkpointinterval':60}
|
||||||
|
|
||||||
#updatecfgDict = {'checkpointInterval': 60 ,}
|
#updatecfgDict = {'checkpointInterval': 60 ,}
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
@ -70,7 +71,7 @@ class TDTestCase:
|
||||||
while(True):
|
while(True):
|
||||||
if(self.check_vnodestate()):
|
if(self.check_vnodestate()):
|
||||||
break
|
break
|
||||||
sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type == "vnode"'
|
sql = 'select task_id, node_id, checkpoint_id, checkpoint_ver from information_schema.ins_stream_tasks where `level` = "source" or `level` = "agg" and node_type = "vnode"'
|
||||||
for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql):
|
for task_id, vnode, checkpoint_id, checkpoint_ver in tdSql.getResult(sql):
|
||||||
dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/"
|
dirpath = f"{cluster.dnodes[self.vnode_dict[vnode]-1].dataDir}/vnode/vnode{vnode}/"
|
||||||
info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id)
|
info_path = self.find_checkpoint_info_file(dirpath, checkpoint_id, task_id)
|
||||||
|
|
Loading…
Reference in New Issue