refactor: do some internal refactor.
This commit is contained in:
parent
88e77d6bc6
commit
385e1a8b0d
|
@ -541,6 +541,65 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
|
//static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
|
||||||
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
|
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
|
||||||
|
|
||||||
|
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
int32_t blockSize = 0;
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger");
|
||||||
|
|
||||||
|
int64_t ver = pTask->chkInfo.processedVer;
|
||||||
|
doSetStreamInputBlock(pTask, pCheckpointBlock, &ver, id);
|
||||||
|
|
||||||
|
int64_t totalSize = 0;
|
||||||
|
int32_t totalBlocks = 0;
|
||||||
|
streamTaskExecImpl(pTask, pCheckpointBlock, &totalSize, &totalBlocks);
|
||||||
|
|
||||||
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
|
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
|
||||||
|
SIZE_IN_MiB(totalSize), totalBlocks);
|
||||||
|
|
||||||
|
pTask->execInfo.outputDataBlocks += totalBlocks;
|
||||||
|
pTask->execInfo.outputDataSize += totalSize;
|
||||||
|
if (fabs(el - 0.0) <= DBL_EPSILON) {
|
||||||
|
pTask->execInfo.procsThroughput = 0;
|
||||||
|
pTask->execInfo.outputThroughput = 0;
|
||||||
|
} else {
|
||||||
|
pTask->execInfo.outputThroughput = (totalSize / el);
|
||||||
|
pTask->execInfo.procsThroughput = (blockSize / el);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
||||||
|
if (dropRelHTask) {
|
||||||
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
||||||
|
|
||||||
|
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||||
|
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||||
|
if (pHTask != NULL) {
|
||||||
|
// 2. transfer the ownership of executor state
|
||||||
|
streamTaskReleaseState(pHTask);
|
||||||
|
streamTaskReloadState(pTask);
|
||||||
|
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
|
||||||
|
streamTaskGetStatus(pHTask)->name);
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||||
|
} else {
|
||||||
|
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
|
||||||
|
(int32_t)pHTaskId->taskId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s no transfer-state needed", id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
|
||||||
|
doStreamTaskExecImpl(pTask, pCheckpointBlock);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||||
|
@ -628,29 +687,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (type == STREAM_INPUT__CHECKPOINT) {
|
|
||||||
// // transfer the state from fill-history to related stream task before generating the checkpoint.
|
|
||||||
// bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
|
||||||
// if (dropRelHTask) {
|
|
||||||
// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
|
||||||
//
|
|
||||||
// STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
|
||||||
// SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
|
||||||
// if (pHTask != NULL) {
|
|
||||||
// // 2. transfer the ownership of executor state
|
|
||||||
// streamTaskReleaseState(pHTask);
|
|
||||||
// streamTaskReloadState(pTask);
|
|
||||||
// stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
|
|
||||||
// streamTaskGetStatus(pHTask)->name);
|
|
||||||
//
|
|
||||||
// streamMetaReleaseTask(pTask->pMeta, pHTask);
|
|
||||||
// } else {
|
|
||||||
// stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
|
|
||||||
// (int32_t)pHTaskId->taskId);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (type != STREAM_INPUT__CHECKPOINT) {
|
if (type != STREAM_INPUT__CHECKPOINT) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks,
|
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks,
|
||||||
|
|
Loading…
Reference in New Issue