refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-08 11:21:24 +08:00
parent 5ee1cba173
commit c43fbf6fee
4 changed files with 23 additions and 43 deletions

View File

@ -59,9 +59,10 @@ extern "C" {
#define STREAM_EXEC_T_RESUME_TASK (-6)
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
#define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
@ -153,10 +154,6 @@ typedef enum EStreamTaskEvent {
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
typedef struct {
int8_t type;
} SStreamQueueItem;
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
@ -190,13 +187,6 @@ typedef struct {
SSDataBlock* pBlock;
} SStreamRefDataBlock;
typedef struct SStreamQueueNode SStreamQueueNode;
struct SStreamQueueNode {
SStreamQueueItem* item;
SStreamQueueNode* next;
};
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
@ -437,7 +427,7 @@ struct SStreamTask {
SCheckpointInfo chkInfo;
STaskExec exec;
SDataRange dataRange;
SVersionRange step2Range;
SVersionRange step2Range; // version range used to scan wal, information in dataRange should not modified.
SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId;
STaskExecStatisInfo execInfo;
@ -445,14 +435,11 @@ struct SStreamTask {
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
SArray* pRspMsgList;
SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
int32_t numOfWaitingUpstream;
int32_t refCnt;
int32_t transferStateAlignCnt;
struct SStreamMeta* pMeta;

View File

@ -88,6 +88,10 @@ struct SStreamQueue {
int8_t status;
};
struct SStreamQueueItem {
int8_t type;
};
typedef enum {
EXEC_CONTINUE = 0x0,
EXEC_AFTER_IDLE = 0x1,

View File

@ -48,9 +48,9 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
SStreamTaskState* p = streamTaskGetStatus(pTask);
if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
int32_t numOfUps = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p->name);
pTask->id.idStr, pTask->info.taskLevel, numOfUps, p->name);
}
ASSERT(pTask->status.downstreamReady == 0);
@ -117,18 +117,16 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask)->state;
ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY));
ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY) &&
(pTask->info.fillHistory == 1));
if (level == TASK_LEVEL__SOURCE) {
return doStartScanHistoryTask(pTask);
} else if (level == TASK_LEVEL__AGG) {
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
}
return streamSetParamForScanHistory(pTask);
} else if (level == TASK_LEVEL__SINK) {
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
}
return 0;
}
@ -208,27 +206,21 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
}
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// set the state to be ready
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
SStreamTaskState* p = streamTaskGetStatus(pTask);
ASSERT(p->state == TASK_STATUS__SCAN_HISTORY);
ASSERT((p->state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1));
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p->name);
streamTaskStartScanHistory(pTask);
} else {
stDebug("s-task:%s scan wal data, status:%s", id, p->name);
}
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p->name);
streamTaskStartScanHistory(pTask);
// NOTE: there will be an deadlock if launch fill history here.
// // start the related fill-history task, when current task is ready
// if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
// streamLaunchFillHistoryTask(pTask);
// }
// start the related fill-history task, when current task is ready
// if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
// streamLaunchFillHistoryTask(pTask);
// }
return TSDB_CODE_SUCCESS;
}
@ -515,6 +507,8 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe
SVersionRange* pRange = &pTask->dataRange.range;
ASSERT(nextProcessVer >= pRange->maxVer);
// maxVer for fill-history task is the version, where the last timestamp is acquired.
// it's also the maximum version to scan data in tsdb.
int64_t walScanStartVer = pRange->maxVer + 1;
if (walScanStartVer > nextProcessVer - 1) {
stDebug(

View File

@ -456,11 +456,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
tSimpleHashCleanup(pTask->pNameMap);
}
if (pTask->pRspMsgList != NULL) {
taosArrayDestroyEx(pTask->pRspMsgList, freeItem);
pTask->pRspMsgList = NULL;
}
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);