fix(stream): adjust nodeUpdate check interval.

This commit is contained in:
Haojun Liao 2023-09-25 10:40:30 +08:00
parent f4f55fcbce
commit 4fe1382cd9
3 changed files with 9 additions and 8 deletions

View File

@ -242,7 +242,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsStreamCheckpointTickInterval = 10;
int32_t tsStreamNodeCheckInterval = 10;
int32_t tsStreamNodeCheckInterval = 30;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups

View File

@ -2418,7 +2418,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate should be trigger by s-task:0x%" PRIx64,
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
pNodeEntry->stageUpdated = true;

View File

@ -255,20 +255,21 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
const char* id = pTask->id.idStr;
if (stage == -1) {
stDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, stage);
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, vgId, stage);
return 0;
}
if (pInfo->stage == -1) {
pInfo->stage = stage;
stDebug("s-task:%s receive check msg from upstream task:0x%x first time, init stage value:%" PRId64, id,
upstreamTaskId, stage);
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d) first time, init stage value:%" PRId64, id,
upstreamTaskId, vgId, stage);
}
if (pInfo->stage < stage) {
stError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
}
if (pTask->status.downstreamReady != 1) {