refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-17 21:57:09 +08:00
parent ddf98ae0c2
commit ac84a5f82b
8 changed files with 40 additions and 39 deletions

View File

@ -384,8 +384,8 @@ typedef struct SSinkRecorder {
typedef struct STaskExecStatisInfo { typedef struct STaskExecStatisInfo {
int64_t created; int64_t created;
int64_t init; int64_t checkTs;
int64_t start; int64_t readyTs;
int64_t startCheckpointId; int64_t startCheckpointId;
int64_t startCheckpointVer; int64_t startCheckpointVer;
@ -478,7 +478,7 @@ typedef struct STaskStartInfo {
int64_t startTs; int64_t startTs;
int64_t readyTs; int64_t readyTs;
int32_t tasksWillRestart; int32_t tasksWillRestart;
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. int32_t startAllTasks; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int64_t elapsedTime; int64_t elapsedTime;

View File

@ -309,7 +309,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
pRec->numOfSubmit += 1; pRec->numOfSubmit += 1;
if ((pRec->numOfSubmit % 1000) == 0) { if ((pRec->numOfSubmit % 1000) == 0) {
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, %.2fMiB duration:%.2f Sec.", " submit into dst table, %.2fMiB duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize),

View File

@ -455,8 +455,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
} }
static void setParam(SStreamTask* pTask, int64_t* initTs, bool* hasHTask, STaskId* pId) { static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) {
*initTs = pTask->execInfo.init; *startCheckTs = pTask->execInfo.checkTs;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
*hasHTask = true; *hasHTask = true;
@ -525,6 +525,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
if (pTask == NULL) { if (pTask == NULL) {
streamMetaRLock(pMeta); streamMetaRLock(pMeta);
// let's try to find this task in hashmap
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) { if (ppTask != NULL) {
setParam(*ppTask, &initTs, &hasHistoryTask, &fId); setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
@ -533,7 +534,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
if (hasHistoryTask) { if (hasHistoryTask) {
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
} }
} else { } else { // not exist even in the hash map of meta, forget it
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
} }
@ -749,7 +750,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.startAllTasks == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
@ -757,7 +758,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pMeta->startInfo.taskStarting = 1; pMeta->startInfo.startAllTasks = 1;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
terrno = 0; terrno = 0;
@ -873,7 +874,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
bool scanWal = false; bool scanWal = false;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pStartInfo->taskStarting == 1) { if (pStartInfo->startAllTasks == 1) {
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
} else { // not in starting procedure } else { // not in starting procedure

View File

@ -579,12 +579,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta); int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
if (numOfTasks > 0) { if (numOfTasks > 0) {
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.startAllTasks == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
} else { } else {
pMeta->startInfo.taskStarting = 1; pMeta->startInfo.startAllTasks = 1;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);

View File

@ -1119,7 +1119,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.stage = pMeta->stage, .stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
.startTime = (*pTask)->execInfo.start, .startTime = (*pTask)->execInfo.readyTs,
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
@ -1329,7 +1329,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
pStartInfo->readyTs = 0; pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0 // reset the sentinel flag value to be 0
pStartInfo->taskStarting = 0; pStartInfo->startAllTasks = 0;
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
@ -1496,7 +1496,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
streamLaunchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
} }
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;
} }
@ -1506,10 +1506,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret; code = ret;
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id; STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
} }
} }
@ -1601,10 +1601,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id; STaskId* pId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
} }
} }
@ -1641,7 +1641,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pStartInfo->taskStarting != 1) { if (pStartInfo->startAllTasks != 1) {
int64_t el = endTs - startTs; int64_t el = endTs - startTs;
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
pMeta->vgId, taskId, ready, el); pMeta->vgId, taskId, ready, el);

View File

@ -56,8 +56,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1; pTask->status.downstreamReady = 1;
pTask->execInfo.start = taosGetTimestampMs(); pTask->execInfo.readyTs = taosGetTimestampMs();
int64_t el = (pTask->execInfo.start - pTask->execInfo.init); int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p->name); pTask->id.idStr, numOfDowns, el, p->name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -391,9 +391,9 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
int64_t initTs = pTask->execInfo.init; int64_t checkTs = pTask->execInfo.checkTs;
int64_t startTs = pTask->execInfo.start; int64_t readyTs = pTask->execInfo.readyTs;
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
if (pTask->status.taskStatus == TASK_STATUS__HALT) { if (pTask->status.taskStatus == TASK_STATUS__HALT) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
@ -498,7 +498,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
} }
int32_t startTs = pTask->execInfo.init; int32_t startTs = pTask->execInfo.checkTs;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
@ -603,13 +603,13 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
SDataRange* pRange = &pHTask->dataRange; SDataRange* pRange = &pHTask->dataRange;
// the query version range should be limited to the already processed data // the query version range should be limited to the already processed data
pHTask->execInfo.init = taosGetTimestampMs(); pHTask->execInfo.checkTs = taosGetTimestampMs();
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
" verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64, " verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64,
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer, pHTask->execInfo.init); pRange->range.maxVer, pHTask->execInfo.checkTs);
} else { } else {
stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
} }
@ -767,8 +767,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId);
if (pInfo == NULL) { if (pInfo == NULL) {
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
return terrno; return terrno;
} }
@ -785,7 +784,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return terrno; return terrno;
} }
@ -816,7 +815,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
pStatus->name); pStatus->name);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
return -1; // todo set the correct error code return -1; // todo set the correct error code
} }
@ -831,11 +830,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
if (pHisTask == NULL) { if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
} else { } else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
} else { // exist, but not ready, continue check downstream task status } else { // exist, but not ready, continue check downstream task status
checkFillhistoryTaskStatus(pTask, pHisTask); checkFillhistoryTaskStatus(pTask, pHisTask);
} }

View File

@ -365,8 +365,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
" nextProcessVer:%" PRId64 ", checkpointCount:%d", " nextProcessVer:%" PRId64 ", checkpointCount:%d",
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount,
pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint); pStatis->latestUpdateTs, pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer,
pStatis->checkpoint);
// remove the ref by timer // remove the ref by timer
while (pTask->status.timerActive > 0) { while (pTask->status.timerActive > 0) {

View File

@ -80,10 +80,10 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
} }
int32_t streamTaskInitStatus(SStreamTask* pTask) { int32_t streamTaskInitStatus(SStreamTask* pTask) {
pTask->execInfo.init = taosGetTimestampMs(); pTask->execInfo.checkTs = taosGetTimestampMs();
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
pTask->execInfo.init); pTask->execInfo.checkTs);
streamTaskCheckDownstream(pTask); streamTaskCheckDownstream(pTask);
return 0; return 0;
} }