refactor: display the time window for force_window_close.

This commit is contained in:
Haojun Liao 2024-11-29 13:47:20 +08:00
parent 6580e7751f
commit 9a9a1828c1
5 changed files with 43 additions and 26 deletions

View File

@ -178,7 +178,7 @@ int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId);
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId); int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId);
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows); int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t nRows, int32_t p);
int32_t mndProcessResetStatusReq(SRpcMsg *pReq); int32_t mndProcessResetStatusReq(SRpcMsg *pReq);

View File

@ -404,8 +404,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
return code; return code;
} }
mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, isHistoryTask);
isHistoryTask);
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
haltInitialTaskStatus(pTask, plan, isHistoryTask); haltInitialTaskStatus(pTask, plan, isHistoryTask);
@ -461,7 +460,7 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
addNewTaskList(pStream); addNewTaskList(pStream);
while (1) { while (1) {
SVgObj* pVgroup; SVgObj* pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) { if (pIter == NULL) {
break; break;

View File

@ -21,7 +21,6 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndVgroup.h"
#include "osMemory.h" #include "osMemory.h"
#include "parser.h" #include "parser.h"
#include "taoserror.h" #include "taoserror.h"
@ -1610,6 +1609,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} }
} }
int32_t precision = TSDB_TIME_PRECISION_MILLI;
SDbObj *pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
if (pSourceDb != NULL) {
precision = pSourceDb->cfg.precision;
mndReleaseDb(pMnode, pSourceDb);
}
// add row for each task // add row for each task
SStreamTaskIter *pIter = NULL; SStreamTaskIter *pIter = NULL;
code = createStreamTaskIter(pStream, &pIter); code = createStreamTaskIter(pStream, &pIter);
@ -1628,7 +1634,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
break; break;
} }
code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows, precision);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
numOfRows++; numOfRows++;
} }

View File

@ -1043,7 +1043,7 @@ _end:
return code; return code;
} }
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows, int32_t precision) {
SColumnInfoData *pColInfo = NULL; SColumnInfoData *pColInfo = NULL;
int32_t cols = 0; int32_t cols = 0;
int32_t code = 0; int32_t code = 0;
@ -1103,14 +1103,11 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
// level // level
char level[20 + VARSTR_HEADER_SIZE] = {0}; char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6); STR_WITH_SIZE_TO_VARSTR(level, "source", 6);
varDataSetLen(level, 6);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3); STR_WITH_SIZE_TO_VARSTR(level, "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4); STR_WITH_SIZE_TO_VARSTR(level, "sink", 4);
varDataSetLen(level, 4);
} }
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -1234,10 +1231,13 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char *sinkStr = "%.2f MiB"; const char *sinkStr = "%.2f MiB";
snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info
// offset info if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; taosFormatUtcTime(buf, tListLen(buf), pe->processedVer, precision);
snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } else {
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
}
} else { } else {
memset(buf, 0, tListLen(buf)); memset(buf, 0, tListLen(buf));
} }

View File

@ -75,6 +75,25 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
} }
static void setProcessProgress(SStreamTask* pTask, STaskStatusEntry* pEntry) {
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
return;
}
if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
pEntry->processedVer = pTask->status.latestForceWindow.skey;
} else {
if (pTask->exec.pWalReader != NULL) {
pEntry->processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1;
if (pEntry->processedVer < 0) {
pEntry->processedVer = pTask->chkInfo.processedVer;
}
walReaderValidVersionRange(pTask->exec.pWalReader, &pEntry->verRange.minVer, &pEntry->verRange.maxVer);
}
}
}
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) { static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
int32_t code = 0; int32_t code = 0;
int32_t tlen = 0; int32_t tlen = 0;
@ -209,16 +228,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
} }
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (pTask->exec.pWalReader != NULL) { setProcessProgress(pTask, &entry);
entry.processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1;
if (entry.processedVer < 0) {
entry.processedVer = pTask->chkInfo.processedVer;
}
walReaderValidVersionRange(pTask->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(pTask, pMsg); addUpdateNodeIntoHbMsg(pTask, pMsg);
p = taosArrayPush(pMsg->pTaskStatus, &entry); p = taosArrayPush(pMsg->pTaskStatus, &entry);
if (p == NULL) { if (p == NULL) {
stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId); stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId);