diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ed25783e9f..b614b813d1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -422,6 +422,8 @@ typedef struct { STsdb *pTsdb; // [input] SBlockIdx *pBlockIdxExp; // [input] STSchema *pTSchema; // [input] + tb_uid_t suid; + tb_uid_t uid; int32_t nFileSet; int32_t iFileSet; SArray *aDFileSet; @@ -593,6 +595,9 @@ typedef struct SFSNextRowIter { SFSNEXTROWSTATES state; // [input] STsdb *pTsdb; // [input] SBlockIdx *pBlockIdxExp; // [input] + STSchema *pTSchema; // [input] + tb_uid_t suid; + tb_uid_t uid; int32_t nFileSet; int32_t iFileSet; SArray *aDFileSet; @@ -685,6 +690,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock); /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ + tBlockDataReset(state->pBlockData); + code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema); + if (code) goto _err; + code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData); if (code) goto _err; @@ -958,16 +967,21 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; - pIter->fsLastState.state = (SFSLASTNEXTROWSTATES) SFSNEXTROW_FS; + pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; pIter->fsLastState.pBlockIdxExp = &pIter->idx; pIter->fsLastState.pTSchema = pTSchema; + pIter->fsLastState.suid = suid; + pIter->fsLastState.uid = uid; pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.pTsdb = pTsdb; pIter->fsState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; pIter->fsState.pBlockIdxExp = &pIter->idx; + pIter->fsState.pTSchema = pTSchema; + pIter->fsState.suid = suid; + pIter->fsState.uid = uid; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 1b3d75f33b..ce841ed83c 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -60,8 +60,7 @@ typedef enum { #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 #define SCH_DEFAULT_MAX_RETRY_NUM 6 - -#define SCH_ASYNC_LAUNCH_TASK 0 +#define SCH_MIN_AYSNC_EXEC_NUM 3 typedef struct SSchDebug { bool lockEnable; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 612b908d41..d16d15c119 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -871,14 +871,14 @@ _return: taosMemoryFree(param); -#if SCH_ASYNC_LAUNCH_TASK - if (code) { - code = schProcessOnTaskFailure(pJob, pTask, code); + if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { + if (code) { + code = schProcessOnTaskFailure(pJob, pTask, code); + } + if (code) { + code = schHandleJobFailure(pJob, code); + } } - if (code) { - code = schHandleJobFailure(pJob, code); - } -#endif SCH_RET(code); } @@ -893,12 +893,12 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { param->pJob = pJob; param->pTask = pTask; -#if SCH_ASYNC_LAUNCH_TASK - taosAsyncExec(schLaunchTaskImpl, param, NULL); -#else - SCH_ERR_RET(schLaunchTaskImpl(param)); -#endif - + if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { + taosAsyncExec(schLaunchTaskImpl, param, NULL); + } else { + SCH_ERR_RET(schLaunchTaskImpl(param)); + } + return TSDB_CODE_SUCCESS; }