refactor: do some internal refactor.
This commit is contained in:
parent
11c9c7d936
commit
4970040459
|
@ -324,12 +324,13 @@ typedef struct SStreamStatus {
|
||||||
int8_t taskStatus;
|
int8_t taskStatus;
|
||||||
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
|
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
|
||||||
int8_t schedStatus;
|
int8_t schedStatus;
|
||||||
int32_t schedIdleTime; // idle time before invoke again
|
|
||||||
int64_t lastExecTs; // last exec time stamp
|
|
||||||
int8_t statusBackup;
|
int8_t statusBackup;
|
||||||
bool appendTranstateBlock; // has append the transfer state data block already
|
int32_t schedIdleTime; // idle time before invoke again
|
||||||
int32_t timerActive; // timer is active
|
int32_t timerActive; // timer is active
|
||||||
|
int64_t lastExecTs; // last exec time stamp
|
||||||
int32_t inScanHistorySentinel;
|
int32_t inScanHistorySentinel;
|
||||||
|
bool appendTranstateBlock; // has append the transfer state data block already
|
||||||
|
bool supplementaryWalscan; // complete the supplementary wal scan or not
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
||||||
typedef struct SDataRange {
|
typedef struct SDataRange {
|
||||||
|
|
|
@ -835,8 +835,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
// checkpoint ver is the kept version, handled data should be the next version.
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
if (pTask->chkInfo.checkpointId != 0) {
|
if (pChkInfo->checkpointId != 0) {
|
||||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -390,6 +390,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
pTimeWindow->skey = INT64_MIN;
|
pTimeWindow->skey = INT64_MIN;
|
||||||
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||||
|
stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter",
|
||||||
|
pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
|
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
@ -398,9 +400,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamTaskReleaseState(pTask);
|
streamTaskReleaseState(pTask);
|
||||||
streamTaskReloadState(pStreamTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
|
|
||||||
|
// 3. scan wal file from the beginning till the end version of fill-history task.
|
||||||
|
streamTaskSupplementaryScan(pStreamTask);
|
||||||
|
|
||||||
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
|
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
|
||||||
streamTaskSendCheckpointReq(pStreamTask);
|
// streamTaskSendCheckpointReq(pStreamTask);
|
||||||
// streamTaskResume(pStreamTask);
|
|
||||||
|
|
||||||
// 4. assign the status to the value that will be kept in disk
|
// 4. assign the status to the value that will be kept in disk
|
||||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
|
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
|
||||||
|
@ -777,6 +781,8 @@ int32_t streamResumeTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
/*int32_t code = */ doStreamExecTask(pTask);
|
/*int32_t code = */ doStreamExecTask(pTask);
|
||||||
|
|
||||||
|
// check if continue
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
||||||
|
|
|
@ -1322,7 +1322,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
|
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
|
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -385,7 +385,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
||||||
|
|
||||||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||||
|
|
||||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
||||||
|
|
||||||
int64_t initTs = pTask->execInfo.init;
|
int64_t initTs = pTask->execInfo.init;
|
||||||
|
@ -989,4 +988,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||||
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
|
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -670,7 +670,6 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
|
||||||
if (!pCur) {
|
if (!pCur) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
qDebug("streamStateFreeCur");
|
|
||||||
streamStateResetCur(pCur);
|
streamStateResetCur(pCur);
|
||||||
taosMemoryFree(pCur);
|
taosMemoryFree(pCur);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue