other: merge 3.0

This commit is contained in:
Haojun Liao 2024-09-13 19:02:56 +08:00
parent 0f7b2ea467
commit 83d531284b
2 changed files with 46 additions and 26 deletions

View File

@ -673,7 +673,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
int32_t streamExecTask(SStreamTask* pTask);
void streamResumeTask(SStreamTask* pTask);
int32_t streamResumeTask(SStreamTask* pTask);
int32_t streamTrySchedExec(SStreamTask* pTask);
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
void streamTaskResumeInFuture(SStreamTask* pTask);

View File

@ -24,7 +24,7 @@
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
static void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState pState = streamTaskGetStatus(pTask);
@ -95,7 +95,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return code;
}
void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
int32_t size = 0;
@ -112,7 +112,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
if (streamTaskShouldStop(pTask) || (pRes == NULL)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return;
return code;
}
SSDataBlock* output = NULL;
@ -122,8 +122,13 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
resetTaskInfo(pExecutor);
}
stError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code));
continue;
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {
stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code));
return code;
} else {
qResetTaskCode(pExecutor);
continue;
}
}
if (output == NULL) {
@ -194,7 +199,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
// todo: here we need continue retry to put it into output buffer
if (code != TSDB_CODE_SUCCESS) {
return;
return code;
}
pRes = NULL;
@ -208,6 +213,8 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
}
return code;
}
// todo contiuous try to create result blocks
@ -627,7 +634,7 @@ static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks,
}
}
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
const char* id = pTask->id.idStr;
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
@ -635,23 +642,28 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
int64_t ver = pInfo->processedVer;
int64_t totalSize = 0;
int32_t totalBlocks = 0;
int32_t code = 0;
stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
int32_t code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
code = doSetStreamInputBlock(pTask, pBlock, &ver, id);
if (code) {
stError("s-task:%s failed to set input block, not exec for these blocks", id);
return;
return code;
}
code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
if (code) {
return code;
}
streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
// update the currentVer if processing the submit blocks.
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
pInfo->checkpointVer, pInfo->nextProcessVer, ver);
return;
return code;
}
if (ver != pInfo->processedVer) {
@ -660,6 +672,8 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
}
return code;
}
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
@ -712,6 +726,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
*/
static int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
// merge multiple input data if possible in the input queue.
stDebug("s-task:%s start to extract data block from inputQ", id);
@ -784,9 +799,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (type == STREAM_INPUT__DATA_BLOCK) {
pTask->execInfo.sink.dataSize += blockSize;
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
int32_t code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error.
return code;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
@ -801,17 +816,19 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
if (type != STREAM_INPUT__CHECKPOINT) {
doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks);
streamFreeQitem(pInput);
if (code) {
return code;
}
} else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name);
int32_t code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue
} else { // todo refactor
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
@ -827,7 +844,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
streamMutexUnlock(&pTask->lock);
streamFreeQitem(pInput);
return 0;
return code;
}
}
}
@ -858,21 +875,21 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
}
}
void streamResumeTask(SStreamTask* pTask) {
int32_t streamResumeTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
stError("s-task:%s invalid sched status:%d, not resume task", id, pTask->status.schedStatus);
return;
stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
return code;
}
while (1) {
code = doStreamExecTask(pTask);
if (code) {
stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code));
return code;
}
// check if continue
streamMutexLock(&pTask->lock);
@ -888,7 +905,7 @@ void streamResumeTask(SStreamTask* pTask) {
stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
pTask->status.schedStatus, pTask->status.lastExecTs);
return;
return code;
} else {
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
@ -896,28 +913,31 @@ void streamResumeTask(SStreamTask* pTask) {
streamMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return;
return code;
}
}
streamMutexUnlock(&pTask->lock);
}
return code;
}
int32_t streamExecTask(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
const char* id = pTask->id.idStr;
int32_t code = 0;
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
streamResumeTask(pTask);
code = streamResumeTask(pTask);
} else {
char* p = streamTaskGetStatus(pTask).name;
stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
pTask->status.schedStatus);
}
return 0;
return code;
}
int32_t streamTaskReleaseState(SStreamTask* pTask) {