refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-18 15:34:53 +08:00
parent 2904cfd4d5
commit 54ec679b58
12 changed files with 465 additions and 254 deletions

View File

@ -44,11 +44,19 @@ extern "C" {
#define NODE_ROLE_LEADER 0x2
#define NODE_ROLE_FOLLOWER 0x3
#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0)
#define CLEAR_RELATED_FILLHISTORY_TASK(_t) \
do { \
(_t)->hTaskInfo.id.taskId = 0; \
(_t)->hTaskInfo.id.streamId = 0; \
} while (0);
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
#define SSTREAM_TASK_VER 2
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
@ -119,6 +127,21 @@ enum {
STREAM_META_OK_TO_STOP = 2,
};
typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCAN_HISTORY = 0x2,
TASK_EVENT_SCANHIST_COMPLETED = 0x3,
TASK_EVENT_START = 0x4,
TASK_EVENT_STOP = 0x5,
TASK_EVENT_GEN_CHECKPOINT = 0x6,
TASK_EVENT_PAUSE = 0x7,
TASK_EVENT_RESUME = 0x8,
TASK_EVENT_HALT = 0x9,
TASK_EVENT_TRANS_STATE = 0xA,
TASK_EVENT_SCAN_TSDB = 0xB,
TASK_EVENT_SCAN_WAL = 0xC,
} EStreamTaskEvent;
typedef struct {
int8_t type;
} SStreamQueueItem;
@ -351,6 +374,7 @@ typedef struct SHistoryTaskInfo {
int32_t tickCount;
int32_t retryTimes;
int32_t waitInterval;
int64_t haltVer; // offset in wal when halt the stream task
} SHistoryTaskInfo;
typedef struct STaskOutputInfo {
@ -692,20 +716,28 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
//int32_t streamTaskStartScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);
void streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,

View File

@ -92,11 +92,13 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
}
}
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
char* p = NULL;
streamTaskGetStatus(pTask, &p);
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam);
return 0;
}
@ -174,10 +176,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
streamTaskCheckDownstream(pTask);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
pTask->id.idStr, p, numOfTasks);
ASSERT(0);
// streamTaskCheckDownstream(pTask);
return 0;
}
@ -352,10 +359,10 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pSnode->pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",

View File

@ -817,28 +817,29 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
}
// sink
if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
pTask->outputInfo.smaSink.vnode = pTq->pVnode;
pTask->outputInfo.smaSink.smaSink = smaHandleRes;
} else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
pTask->outputInfo.tbSink.vnode = pTq->pVnode;
pTask->outputInfo.tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__SMA) {
pOutputInfo->smaSink.vnode = pTq->pVnode;
pOutputInfo->smaSink.smaSink = smaHandleRes;
} else if (pOutputInfo->type == TASK_OUTPUT__TABLE) {
pOutputInfo->tbSink.vnode = pTq->pVnode;
pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
int32_t ver1 = 1;
SMetaInfo info = {0};
code = metaGetInfo(pTq->pVnode->pMeta, pTask->outputInfo.tbSink.stbUid, &info, NULL);
code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL);
if (code == TSDB_CODE_SUCCESS) {
ver1 = info.skmVer;
}
SSchemaWrapper* pschemaWrapper = pTask->outputInfo.tbSink.pSchemaWrapper;
pTask->outputInfo.tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if (pTask->outputInfo.tbSink.pTSchema == NULL) {
SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
if (pOutputInfo->tbSink.pTSchema == NULL) {
return -1;
}
pTask->outputInfo.tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pTask->outputInfo.tbSink.pTblInfo, freePtr);
pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr);
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -863,20 +864,23 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
char* p = NULL;
streamTaskGetStatus(pTask, &p);
if (pTask->info.fillHistory) {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
} else {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
}
return 0;
@ -918,9 +922,10 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
@ -1023,11 +1028,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
bool restored = pTq->pVnode->restored;
if (p != NULL && restored) {
p->execInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->execInfo.init);
streamTaskCheckDownstream(p);
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
}
@ -1061,7 +1064,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1
const char* id = pTask->id.idStr;
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
char* pStatus = NULL;
streamTaskGetStatus(pTask, &pStatus);
// avoid multi-thread exec
while(1) {
@ -1115,7 +1119,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
int8_t status = streamTaskSetSchedStatusInActive(pTask);
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1124,12 +1128,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// the following procedure should be executed, no matter status is stop/pause or not
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) {
SVersionRange* pRange = NULL;
SStreamTask* pStreamTask = NULL;
bool done = false;
// 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
@ -1148,10 +1151,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
#if 0
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
while (streamTaskGetStatus(pStreamTask, NULL) == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
@ -1209,21 +1212,26 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosThreadMutexUnlock(&pStreamTask->lock);
break;
}
#endif
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamTaskRestoreStatus(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
}
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
// qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id,
// streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus));
// }
streamExecTask(pTask); // exec directly
} else {
@ -1243,35 +1251,29 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
#if 0
// the fill-history task starts to process data in wal, let's set it status to be normal now
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
streamSetStatusNormal(pTask);
}
#endif
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED);
tqScanWalAsync(pTq, false);
}
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
if (pTask->hTaskInfo.id.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug(
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey);
qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor);
} else {
// when related fill-history task exists, update the fill-history time window only when the
// state transfer is completed.
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
}
// Not update the fill-history time window until the state transfer is completed if the related fill-history task
// exists.
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask);
}
@ -1360,17 +1362,17 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
}
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus;
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
ETaskStatus st = streamTaskGetStatus(pTask, &p);
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInActive(pTask);
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(st), status);
pTask->id.idStr, p, status);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);

View File

@ -99,12 +99,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
continue;
}
pTask->execInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->execInfo.init);
streamSetStatusNormal(pTask);
streamTaskCheckDownstream(pTask);
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY;
streamTaskHandleEvent(pTask->status.pSM, event);
streamMetaReleaseTask(pMeta, pTask);
}
@ -113,8 +109,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
}
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
@ -328,9 +324,11 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
}
// not in ready state, do not handle the data from wal
int32_t status = pTask->status.taskStatus;
if (status != TASK_STATUS__NORMAL) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
// int32_t status = pTask->status.taskStatus;
char* p = NULL;
int32_t status = streamTaskGetStatus(pTask, &p);
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) {
tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, p);
return false;
}
@ -449,9 +447,10 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
taosThreadMutexLock(&pTask->lock);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, p);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;

View File

@ -28,34 +28,26 @@ typedef struct SStreamTaskState {
char* name;
} SStreamTaskState;
typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1,
TASK_EVENT_START = 0x2,
TASK_EVENT_STOP = 0x3,
TASK_EVENT_GEN_CHECKPOINT = 0x4,
TASK_EVENT_PAUSE = 0x5,
TASK_EVENT_RESUME = 0x6,
TASK_EVENT_HALT = 0x7,
TASK_EVENT_TRANS_STATE = 0x8,
TASK_EVENT_SCAN_TSDB = 0x9,
TASK_EVENT_SCAN_WAL = 0x10,
} EStreamTaskEvent;
typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct STaskStateTrans {
bool autoInvokeEndFn;
SStreamTaskState state;
EStreamTaskEvent event;
SStreamTaskState next;
__state_trans_fn preAction;
__state_trans_fn pAction;
__state_trans_succ_fn pSuccAction;
} STaskStateTrans;
struct SStreamTaskSM {
SStreamTaskState current;
SArray* pTransList; // SArray<STaskStateTrans>
int64_t stateTs;
SStreamTask* pTask;
SArray* pTransList; // SArray<STaskStateTrans>
STaskStateTrans* pActiveTrans;
int64_t startTs;
SStreamTaskState current;
SStreamTaskState prev;
};
typedef struct SStreamEventInfo {

View File

@ -127,7 +127,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
stError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr);
return -1;
}

View File

@ -662,8 +662,10 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
numOfVgs, p);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamTaskId = pVgInfo->taskId;
@ -775,8 +777,9 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist
initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, p,
pReq->downstreamTaskId, vgId);
return 0;
}

View File

@ -318,12 +318,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// todo. the dropping status should be append to the status after the halt completed.
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2.
int8_t status = pStreamTask->status.taskStatus;
int8_t status = streamTaskGetStatus(pStreamTask, NULL);//pStreamTask->status.taskStatus;
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
} else {
ASSERT(status == TASK_STATUS__NORMAL);
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
// pStreamTask->status.taskStatus = TASK_STATUS__HALT;
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
}
@ -337,9 +338,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
", status:normal, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, pStreamTask->status.schedStatus);
} else {
stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
@ -362,8 +363,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info
pStreamTask->hTaskInfo.id.taskId = 0;
pStreamTask->hTaskInfo.id.streamId = 0;
CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
// 6. save to disk
taosWLockLatch(&pMeta->lock);
@ -505,7 +505,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */ streamTaskSetSchedStatusInActive(pTask);
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
}
}
@ -592,8 +592,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus));
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
streamTaskBuildCheckpoint(pTask);
return 0;
}
@ -628,15 +629,18 @@ int32_t streamExecTask(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus);
return 0;
}
taosThreadMutexUnlock(&pTask->lock);
}
} else {
stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p,
pTask->status.schedStatus);
}
return 0;

View File

@ -518,8 +518,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
}
taosWUnLockLatch(&pMeta->lock);
stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId,
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
while (1) {
taosRLockLatch(&pMeta->lock);

View File

@ -17,6 +17,7 @@
#include "trpc.h"
#include "ttimer.h"
#include "wal.h"
#include "streamsm.h"
typedef struct SLaunchHTaskInfo {
SStreamMeta* pMeta;
@ -34,16 +35,17 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
static void tryLaunchHistoryTask(void* param, void* tmrId);
static int32_t updateTaskReadyInMeta(SStreamTask* pTask);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
int32_t streamTaskSetReady(SStreamTask* pTask) {
char* p = NULL;
int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask);
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
if (status == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
streamGetTaskStatusStr(pTask->status.taskStatus));
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p);
}
ASSERT(pTask->status.downstreamReady == 0);
@ -52,34 +54,10 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
pTask->execInfo.start = taosGetTimestampMs();
int64_t el = (pTask->execInfo.start - pTask->execInfo.init);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
pTask->id.idStr, numOfDowns, el, p);
taosWLockLatch(&pMeta->lock);
STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
pStartInfo->readyTs = pTask->execInfo.start;
if (pStartInfo->startTs != 0) {
pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
} else {
pStartInfo->elapsedTime = 0;
}
streamMetaResetStartInfo(pStartInfo);
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
}
taosWUnLockLatch(&pMeta->lock);
updateTaskReadyInMeta(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
@ -114,28 +92,19 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
ASSERT(pTask->status.downstreamReady == 1);
ASSERT(pTask->status.downstreamReady == 1 && streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
return doStartScanHistoryTask(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
walReaderGetCurrentVer(pTask->exec.pWalReader));
streamTaskEnablePause(pTask);
}
return doStartScanHistoryTask(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask);
streamTaskEnablePause(pTask);
}
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
}
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
}
return 0;
}
@ -152,6 +121,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
.stage = pTask->pMeta->stage,
};
ASSERT(pTask->status.downstreamReady == 0);
// serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64();
@ -187,11 +158,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
}
} else {
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskSetReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask);
streamTaskStartScanHistory(pTask);
streamLaunchFillHistoryTask(pTask);
streamTaskOnHandleEventSuccess(pTask->status.pSM);
}
return 0;
@ -288,8 +255,57 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
}
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskSetReady(pTask, numOfReqs);
int32_t onNormalTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__NORMAL);
// todo refactor: remove this later
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
ASSERT(pTask->hTaskInfo.id.taskId == 0);
}
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));
streamTaskEnablePause(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// set the state to be ready
streamTaskSetReady(pTask);
streamTaskSetRangeStreamCalc(pTask);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p);
streamTaskStartScanHistory(pTask);
// start the related fill-history task, when current task is ready
if (pTask->hTaskInfo.id.taskId != 0) {
streamLaunchFillHistoryTask(pTask);
}
return TSDB_CODE_SUCCESS;
}
// todo: refactor this function.
static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
streamTaskOnHandleEventSuccess(pTask->status.pSM);
#if 0
const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus;
@ -314,6 +330,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskEnablePause(pTask);
}
}
#endif
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
@ -349,7 +366,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
doProcessDownstreamReadyRsp(pTask, numOfReqs);
doProcessDownstreamReadyRsp(pTask);
} else {
int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
@ -361,7 +378,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return -1;
}
doProcessDownstreamReadyRsp(pTask, 1);
doProcessDownstreamReadyRsp(pTask);
}
} else { // not ready, wait for 100ms and retry
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
@ -438,18 +455,6 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
}
int32_t streamSetStatusUnint(SStreamTask* pTask) {
int32_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
stError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr);
return -1;
} else {
stDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT);
return 0;
}
}
// source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
@ -515,9 +520,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
int32_t taskLevel = pTask->info.taskLevel;
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) {
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status != TASK_STATUS__SCAN_HISTORY) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->upstreamTaskId);
pTask->id.idStr, p, pReq->upstreamTaskId);
void* pBuf = NULL;
int32_t len = 0;
@ -571,12 +578,12 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
}
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
ASSERT(/*pTask->status.taskStatus*/ streamTaskGetStatus(pTask, NULL) == TASK_STATUS__SCAN_HISTORY);
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
streamSetStatusNormal(pTask);
streamTaskSetSchedStatusInActive(pTask);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED);
streamTaskSetSchedStatusInactive(pTask);
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
@ -604,15 +611,15 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
" ver range:%" PRId64 " - %" PRId64", init:%"PRId64,
" verRange:%" PRId64 " - %" PRId64", init:%"PRId64,
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey,
pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init);
} else {
stDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
}
// check if downstream tasks have been ready
doCheckDownstreamStatus(pHTask);
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCAN_HISTORY);
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
@ -625,11 +632,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
ASSERT((*ppTask)->status.timerActive >= 1);
if (streamTaskShouldStop(&(*ppTask)->status)) {
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
char* p = NULL;
streamTaskGetStatus((*ppTask), &p);
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, pStatus, (*ppTask)->hTaskInfo.retryTimes, ref);
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
taosMemoryFree(pInfo);
taosWUnLockLatch(&pMeta->lock);
@ -666,8 +673,10 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* p = streamGetTaskStatusStr(pTask->status.taskStatus);
int32_t hTaskId = pHTaskInfo->id.taskId;
char* p = NULL;
int32_t hTaskId = pHTaskInfo->id.taskId;
streamTaskGetStatus(pTask, &p);
stDebug(
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
@ -713,11 +722,8 @@ SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, in
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
if (hTaskId == 0) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pTask->status.downstreamReady == 1);
ASSERT((hTaskId != 0) && (pTask->status.downstreamReady == 1));
stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->hTaskInfo.id.streamId, hTaskId);
@ -931,12 +937,12 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
// only the downstream tasks are ready, set the task to be ready to work.
void streamTaskCheckDownstream(SStreamTask* pTask) {
if (pTask->info.fillHistory) {
stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
return;
}
// if (pTask->info.fillHistory) {
// ASSERT(0);
// stDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
// return;
// }
ASSERT(pTask->status.downstreamReady == 0);
doCheckDownstreamStatus(pTask);
}
@ -1047,13 +1053,50 @@ void streamTaskEnablePause(SStreamTask* pTask) {
void streamTaskResumeFromHalt(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus;
if (status != TASK_STATUS__HALT) {
stError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
return;
char* p = NULL;
ASSERT(streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT);
// int8_t status = pTask->status.taskStatus;
// if (status != TASK_STATUS__HALT) {
// stError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
// return;
// }
// pTask->status.taskStatus = pTask->status.keepTaskStatus;
// pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
streamTaskRestoreStatus(pTask);
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s resume from halt, current status:%s", id, p);
}
int32_t updateTaskReadyInMeta(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
taosWLockLatch(&pMeta->lock);
STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
pStartInfo->readyTs = pTask->execInfo.start;
if (pStartInfo->startTs != 0) {
pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs;
} else {
pStartInfo->elapsedTime = 0;
}
streamMetaResetStartInfo(pStartInfo);
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
}
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
stDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
taosWUnLockLatch(&pMeta->lock);
return TSDB_CODE_SUCCESS;
}

View File

@ -304,11 +304,11 @@ static void freeUpstreamItem(void* p) {
void tFreeStreamTask(SStreamTask* pTask) {
int32_t taskId = pTask->id.taskId;
char* p = NULL;
streamTaskGetStatus(pTask, &p);
STaskExecStatisInfo* pStatis = &pTask->execInfo;
stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState,
streamGetTaskStatusStr(pTask->status.taskStatus));
stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, p);
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
@ -417,6 +417,13 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
tstrerror(terrno));
return terrno;
}
pTask->execInfo.created = taosGetTimestampMs();
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
@ -463,7 +470,9 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
return 0;
} else {
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) {
if (type == TASK_OUTPUT__TABLE) {
return 0;
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
return 1;
} else {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
@ -677,7 +686,7 @@ int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
return status;
}
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE ||

View File

@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInt.h"
#include "tmisce.h"
#include "tstream.h"
@ -32,19 +31,42 @@ SStreamTaskState StreamTaskStatusList[8] = {
{.state = TASK_STATUS__CK, .name = "checkpoint"},
};
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn);
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
SStreamEventInfo StreamTaskEventList[8] = {
{}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"},
{.event = TASK_EVENT_INIT_SCAN_HISTORY, .name = "scan-history-initialize"},
{.event = TASK_EVENT_SCANHIST_COMPLETED, .name = "scan-history-completed"},
};
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return 0; }
static int32_t streamTaskStartCheckDownstream(SStreamTask* pTask) {
stDebug("s-task:%s start to check downstream tasks", pTask->id.idStr);
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
__state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn,
bool autoInvoke);
static int32_t streamTaskInitStatus(SStreamTask* pTask);
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
int32_t streamTaskInitStatus(SStreamTask* pTask) {
pTask->execInfo.init = taosGetTimestampMs();
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
pTask->execInfo.init);
streamTaskCheckDownstream(pTask);
return 0;
}
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
streamSetStatusNormal(pTask); // todo remove it
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskDoPause(SStreamTask* pTask) {
stDebug("s-task:%s start to pause tasks", pTask->id.idStr);
return 0;
}
static int32_t streamTaskDoResume(SStreamTask* pTask) {
stDebug("s-task:%s start to resume tasks", pTask->id.idStr);
return 0;
@ -54,12 +76,32 @@ static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskWaitBeforeHalt(SStreamTask* pTask) {
char* p = NULL;
while (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) {
stDebug("related stream task:%s(status:%s) not ready for halt, wait for 100ms and retry", pTask->id.idStr, p);
taosMsleep(100);
}
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (pTask->hTaskInfo.haltVer == -1) {
pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1;
}
return TSDB_CODE_SUCCESS;
}
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent* pEvent) {
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
int32_t numOfTrans = taosArrayGetSize(pState->pTransList);
for(int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i);
if (pTrans->state.state == pState->current.state && pTrans->event == *pEvent) {
if (pTrans->state.state == pState->current.state && pTrans->event == event) {
return pTrans;
}
}
@ -68,6 +110,20 @@ static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, con
return NULL;
}
void streamTaskRestoreStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
SStreamTaskState state = pSM->current;
pSM->current = pSM->prev;
pSM->prev = state;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.name, pSM->current.name);
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
if (pSM == NULL) {
@ -79,7 +135,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
// set the initial state for the state-machine of stream task
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->stateTs = taosGetTimestampMs();
pSM->startTs = taosGetTimestampMs();
int32_t code = initStateTransferTable(pSM);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pSM);
@ -88,36 +144,71 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
return pSM;
}
int32_t taskSMHandleEvent(SStreamTaskSM* pSM, const EStreamTaskEvent* pEvent) {
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, pEvent);
ASSERT(pSM->pActiveTrans == NULL);
stDebug("start to handle event:%d, state:%s", *pEvent, pSM->current.name);
pSM->pActiveTrans = pTrans;
pSM->stateTs = taosGetTimestampMs();
return pTrans->pAction(pSM->pTask);
}
int32_t taskSMOnHandleEventSuccess(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
EStreamTaskEvent* pEvent = &pTrans->event;
int64_t el = (taosGetTimestampMs() - pSM->stateTs);
stDebug("handle event:%d completed, elapsd time:%" PRId64 "ms new state:%s from %s", *pEvent, el, pTrans->next.name,
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
// todo: add lock
pSM->current = pTrans->next;
int32_t code = pTrans->preAction(pSM->pTask);
taosThreadMutexLock(&pSM->pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pSM->pTask->lock);
code = pTrans->pAction(pSM->pTask);
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM);
}
return code;
}
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
SStreamTask* pTask = pSM->pTask;
// do update the task status
taosThreadMutexLock(&pTask->lock);
SStreamTaskState current = pSM->current;
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn) {
pSM->prev = pSM->current;
pSM->current = pTrans->next;
pSM->pActiveTrans = NULL;
// on success callback, add into lock if necessary, or maybe we should add an option for this?
pTrans->pSuccAction(pTask);
taosThreadMutexUnlock(&pTask->lock);
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
return TSDB_CODE_SUCCESS;
}
ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) {
SStreamTaskState s = pTask->status.pSM->current; // copy one obj in case of multi-thread environment
if (pStr != NULL) {
*pStr = s.name;
}
return s.state;
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
__state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn,
bool autoInvoke) {
STaskStateTrans trans = {0};
trans.state = StreamTaskStatusList[current];
trans.next = StreamTaskStatusList[next];
trans.event = event;
trans.preAction = (preFn != NULL)? preFn:dummyFn;
trans.pAction = (fn != NULL)? fn : dummyFn;
trans.pSuccAction = (succFn != NULL)? succFn:dummyFn;
trans.autoInvokeEndFn = autoInvoke;
return trans;
}
@ -129,20 +220,50 @@ int32_t initStateTransferTable(SStreamTaskSM* pSM) {
}
}
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, streamTaskStartCheckDownstream);
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, NULL,
streamTaskInitStatus, onNormalTaskReady, false);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, streamTaskDoPause);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCAN_HISTORY, NULL,
streamTaskInitStatus, onScanhistoryTaskReady, false);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, streamTaskDoResume);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_COMPLETED, NULL,
streamTaskSetReadyForWal, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, streamTaskDoCheckpoint);
// pause & resume related event handle
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, streamTaskDoPause, NULL,
true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, NULL, streamTaskDoResume,
NULL, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
streamTaskDoCheckpoint, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
taosArrayPush(pSM->pTransList, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL,
streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt,
NULL, streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, NULL,
streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL,
streamTaskKeepCurrentVerInWal, true);
taosArrayPush(pSM->pTransList, &trans);
return 0;
}