refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-23 19:49:14 +08:00
parent 82cde46614
commit a37667968f
1 changed files with 10 additions and 22 deletions

View File

@ -21,6 +21,7 @@
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
#define MIN_INVOKE_INTERVAL 50 // 50ms
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
@ -244,6 +245,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
}
}
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
return (SScanhistoryDataInfo){code, idleTime};
}
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
@ -260,7 +265,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s paused from the scan-history task", id);
// quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
@ -275,7 +280,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
if(streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
// dispatch the generated results
@ -285,38 +290,21 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
// downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
}
if (finished) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
}
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
}
}
}
// wait for the stream task to be idle
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
const char* id = pTask->id.idStr;
int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) {
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
pStreamTask->id.idStr);
taosMsleep(100);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 0) {
stDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el);
}
}
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;