refactor: do some internal refactor.
This commit is contained in:
parent
7f265e181d
commit
4ea737571d
|
@ -209,7 +209,7 @@ typedef struct {
|
|||
int32_t taskId;
|
||||
int32_t nodeId;
|
||||
SEpSet epSet;
|
||||
} STaskDispatcherFixedEp;
|
||||
} STaskDispatcherFixed;
|
||||
|
||||
typedef struct {
|
||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
@ -298,7 +298,7 @@ typedef struct SDispatchMsgInfo {
|
|||
int8_t dispatchMsgType;
|
||||
int16_t msgType; // dispatch msg type
|
||||
int32_t retryCount; // retry send data count
|
||||
int64_t startTs; // output blocking timestamp
|
||||
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
|
||||
SArray* pRetryList; // current dispatch successfully completed node of downstream
|
||||
} SDispatchMsgInfo;
|
||||
|
||||
|
@ -318,24 +318,27 @@ typedef struct STaskSchedInfo {
|
|||
void* pTimer;
|
||||
} STaskSchedInfo;
|
||||
|
||||
typedef struct SSinkTaskRecorder {
|
||||
typedef struct SSinkRecorder {
|
||||
int64_t numOfSubmit;
|
||||
int64_t numOfBlocks;
|
||||
int64_t numOfRows;
|
||||
int64_t bytes;
|
||||
} SSinkTaskRecorder;
|
||||
} SSinkRecorder;
|
||||
|
||||
typedef struct {
|
||||
int64_t created;
|
||||
int64_t init;
|
||||
int64_t step1Start;
|
||||
int64_t step2Start;
|
||||
int64_t start;
|
||||
int32_t updateCount;
|
||||
int32_t dispatchCount;
|
||||
int64_t latestUpdateTs;
|
||||
typedef struct STaskExecStatisInfo {
|
||||
int64_t created;
|
||||
int64_t init;
|
||||
int64_t step1Start;
|
||||
int64_t step2Start;
|
||||
int64_t start;
|
||||
int32_t updateCount;
|
||||
int32_t dispatch;
|
||||
int64_t latestUpdateTs;
|
||||
int32_t checkpoint;
|
||||
SSinkRecorder sink;
|
||||
} STaskExecStatisInfo;
|
||||
|
||||
typedef struct STaskTimer STaskTimer;
|
||||
typedef struct STokenBucket STokenBucket;
|
||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||
|
||||
|
@ -353,23 +356,22 @@ struct SStreamTask {
|
|||
SDataRange dataRange;
|
||||
STaskId historyTaskId;
|
||||
STaskId streamTaskId;
|
||||
STaskExecStatisInfo taskExecInfo;
|
||||
STaskExecStatisInfo execInfo;
|
||||
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
||||
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
||||
SArray* pUpstreamInfoList;
|
||||
|
||||
// output
|
||||
union {
|
||||
STaskDispatcherFixedEp fixedEpDispatcher;
|
||||
STaskDispatcherFixed fixedDispatcher;
|
||||
STaskDispatcherShuffle shuffleDispatcher;
|
||||
STaskSinkTb tbSink;
|
||||
STaskSinkSma smaSink;
|
||||
STaskSinkFetch fetchSink;
|
||||
};
|
||||
SSinkTaskRecorder sinkRecorder;
|
||||
STokenBucket* pTokenBucket;
|
||||
|
||||
void* launchTaskTimer;
|
||||
STokenBucket* pTokenBucket;
|
||||
STaskTimer* pTimer;
|
||||
SMsgCb* pMsgCb; // msg handle
|
||||
SStreamState* pState; // state backend
|
||||
SArray* pRspMsgList;
|
||||
|
|
|
@ -1003,8 +1003,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
|
||||
bool restored = pTq->pVnode->restored;
|
||||
if (p != NULL && restored) {
|
||||
p->taskExecInfo.init = taosGetTimestampMs();
|
||||
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->taskExecInfo.init);
|
||||
p->execInfo.init = taosGetTimestampMs();
|
||||
tqDebug("s-task:%s set the init ts:%"PRId64, p->id.idStr, p->execInfo.init);
|
||||
|
||||
streamTaskCheckDownstream(p);
|
||||
} else if (!restored) {
|
||||
|
@ -1042,14 +1042,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
|
||||
|
||||
if (pTask->taskExecInfo.step1Start == 0) {
|
||||
if (pTask->execInfo.step1Start == 0) {
|
||||
ASSERT(pTask->status.pauseAllowed == false);
|
||||
pTask->taskExecInfo.step1Start = taosGetTimestampMs();
|
||||
pTask->execInfo.step1Start = taosGetTimestampMs();
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
streamTaskEnablePause(pTask);
|
||||
}
|
||||
} else {
|
||||
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->taskExecInfo.step1Start);
|
||||
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->execInfo.step1Start);
|
||||
}
|
||||
|
||||
// we have to continue retrying to successfully execute the scan history task.
|
||||
|
@ -1069,7 +1069,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
streamScanHistoryData(pTask);
|
||||
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
int8_t status = streamTaskSetSchedStatusInActive(pTask);
|
||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -1077,7 +1077,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
// the following procedure should be executed, no matter status is stop/pause or not
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
if (pTask->info.fillHistory) {
|
||||
|
@ -1158,7 +1158,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||
|
||||
if (done) {
|
||||
pTask->taskExecInfo.step2Start = taosGetTimestampMs();
|
||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||
streamTaskPutTranstateIntoInputQ(pTask);
|
||||
streamTryExec(pTask); // exec directly
|
||||
|
@ -1170,7 +1170,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pStreamTask->id.idStr);
|
||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||
|
||||
pTask->taskExecInfo.step2Start = taosGetTimestampMs();
|
||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||
|
||||
int64_t dstVer = pTask->dataRange.range.minVer;
|
||||
|
|
|
@ -270,11 +270,11 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
|
|||
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
|
||||
}
|
||||
|
||||
SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
|
||||
SSinkRecorder* pRec = &pTask->execInfo.sink;
|
||||
|
||||
pRec->numOfSubmit += 1;
|
||||
if ((pRec->numOfSubmit % 5000) == 0) {
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0;
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
|
||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||
" submit into dst table, %.2fMiB duration:%.2f Sec.",
|
||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el);
|
||||
|
@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTask->taskExecInfo.start == 0) {
|
||||
pTask->taskExecInfo.start = taosGetTimestampMs();
|
||||
if (pTask->execInfo.start == 0) {
|
||||
pTask->execInfo.start = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
bool onlySubmitData = true;
|
||||
|
@ -785,7 +785,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
} else if (pDataBlock->info.type == STREAM_CHECKPOINT) {
|
||||
continue;
|
||||
} else {
|
||||
pTask->sinkRecorder.numOfBlocks += 1;
|
||||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
|
||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||
if (submitReq.aSubmitTbData == NULL) {
|
||||
|
@ -833,7 +833,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
|
||||
hasSubmit = true;
|
||||
pTask->sinkRecorder.numOfBlocks += 1;
|
||||
pTask->execInfo.sink.numOfBlocks += 1;
|
||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||
|
||||
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version};
|
||||
|
@ -867,7 +867,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
|||
}
|
||||
}
|
||||
|
||||
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
|
||||
pTask->execInfo.sink.numOfRows += pDataBlock->info.rows;
|
||||
}
|
||||
|
||||
taosHashCleanup(pTableIndexMap);
|
||||
|
|
|
@ -96,8 +96,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
pTask->taskExecInfo.init = taosGetTimestampMs();
|
||||
tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
|
||||
pTask->execInfo.init = taosGetTimestampMs();
|
||||
tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->execInfo.init);
|
||||
|
||||
streamSetStatusNormal(pTask);
|
||||
streamTaskCheckDownstream(pTask);
|
||||
|
@ -306,7 +306,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
|||
", not scan wal anymore, add transfer-state block into inputQ",
|
||||
id, ver, maxVer);
|
||||
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step2Start) / 1000.0;
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0;
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
|
||||
/*int32_t code = */streamSchedExec(pTask);
|
||||
|
|
|
@ -26,6 +26,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define CHECK_DOWNSTREAM_INTERVAL 100
|
||||
|
||||
// clang-format off
|
||||
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||
#define stError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("STM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||
|
@ -53,11 +55,17 @@ struct STokenBucket {
|
|||
int32_t rate; // number of token per second
|
||||
};
|
||||
|
||||
struct STaskTimer {
|
||||
void* hTaskLaunchTimer;
|
||||
void* dispatchTimer;
|
||||
void* checkTimer;
|
||||
};
|
||||
|
||||
extern SStreamGlobalEnv streamEnv;
|
||||
extern int32_t streamBackendId;
|
||||
extern int32_t streamBackendCfWrapperId;
|
||||
|
||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
|
||||
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
|
||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
||||
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||
|
@ -75,7 +83,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
|
|||
|
||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
|
||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
||||
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||
|
||||
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
|
||||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||
|
|
|
@ -141,6 +141,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
|||
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||
|
||||
pTask->execInfo.checkpoint += 1;
|
||||
|
||||
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
|
||||
// inputQ, to make sure all blocks with less version have been handled by this task already.
|
||||
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
|
@ -200,6 +202,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
||||
if (pTask->chkInfo.startTs == 0) {
|
||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||
pTask->execInfo.checkpoint += 1;
|
||||
}
|
||||
|
||||
// update the child Id for downstream tasks
|
||||
|
@ -321,13 +324,15 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
||||
stInfo("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId,
|
||||
el, pTask->checkpointingId);
|
||||
stInfo(
|
||||
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
|
||||
"checkpointId:%" PRId64,
|
||||
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
|
||||
} else {
|
||||
stInfo(
|
||||
"vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not "
|
||||
"ready:%d/%d",
|
||||
pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks);
|
||||
"vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec "
|
||||
"not ready:%d/%d",
|
||||
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks);
|
||||
}
|
||||
|
||||
// send check point response to upstream task
|
||||
|
|
|
@ -114,7 +114,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
|||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->srcVgId = vgId;
|
||||
pReq->stage = pTask->pMeta->stage;
|
||||
pReq->msgId = pTask->taskExecInfo.dispatchCount;
|
||||
pReq->msgId = pTask->execInfo.dispatch;
|
||||
pReq->upstreamTaskId = pTask->id.taskId;
|
||||
pReq->upstreamChildId = pTask->info.selfChildId;
|
||||
pReq->upstreamNodeId = pTask->info.nodeId;
|
||||
|
@ -245,7 +245,7 @@ CLEAR:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
||||
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
SRpcMsg msg = {0};
|
||||
|
@ -305,7 +305,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
||||
|
||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -375,19 +375,19 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
|||
pTask->msgInfo.pData = pReqs;
|
||||
}
|
||||
|
||||
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount);
|
||||
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->execInfo.dispatch);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t msgId = pTask->taskExecInfo.dispatchCount;
|
||||
int32_t msgId = pTask->execInfo.dispatch;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
int32_t vgId = pTask->fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
|
||||
|
@ -422,7 +422,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
|||
static void doRetryDispatchData(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t msgId = pTask->taskExecInfo.dispatchCount;
|
||||
int32_t msgId = pTask->execInfo.dispatch;
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
|
@ -443,7 +443,6 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||
|
||||
|
||||
int32_t numOfFailed = taosArrayGetSize(pList);
|
||||
stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
||||
id, pTask->info.selfChildId, numOfFailed, msgId);
|
||||
|
@ -467,9 +466,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
|
||||
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
|
||||
} else {
|
||||
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
int32_t vgId = pTask->fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
|
||||
|
@ -483,9 +482,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||
// atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
|
||||
} else {
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
}
|
||||
} else {
|
||||
int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
|
@ -497,15 +496,17 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
||||
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
||||
STaskTimer* pTmr = pTask->pTimer;
|
||||
pTask->msgInfo.retryCount++;
|
||||
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr, waitDuration,
|
||||
pTask->taskExecInfo.dispatchCount, pTask->msgInfo.retryCount);
|
||||
|
||||
if (pTask->launchTaskTimer != NULL) {
|
||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
|
||||
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
|
||||
|
||||
if (pTmr->dispatchTimer != NULL) {
|
||||
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTmr->dispatchTimer);
|
||||
} else {
|
||||
pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
|
||||
pTmr->dispatchTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -608,7 +609,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
pBlock->type == STREAM_INPUT__TRANS_STATE);
|
||||
|
||||
int32_t retryCount = 0;
|
||||
pTask->taskExecInfo.dispatchCount += 1;
|
||||
pTask->execInfo.dispatch += 1;
|
||||
pTask->msgInfo.startTs = taosGetTimestampMs();
|
||||
|
||||
int32_t code = doBuildDispatchMsg(pTask, pBlock);
|
||||
|
@ -624,7 +625,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id,
|
||||
pTask->taskExecInfo.dispatchCount, tstrerror(terrno), pTask->outputInfo.status, retryCount);
|
||||
pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputInfo.status, retryCount);
|
||||
|
||||
// todo deal with only partially success dispatch case
|
||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
|
@ -636,11 +637,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -659,9 +659,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
|||
|
||||
// serialize
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
pTask->notReadyTasks = 1;
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
|
@ -1061,7 +1061,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t msgId = pTask->taskExecInfo.dispatchCount;
|
||||
int32_t msgId = pTask->execInfo.dispatch;
|
||||
|
||||
if ((!pTask->pMeta->leader) || (pTask->status.downstreamReady != 1)) {
|
||||
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
|
||||
|
@ -1165,7 +1165,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // this message has been sent successfully, let's try next one.
|
||||
pTask->msgInfo.retryCount = 0;
|
||||
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
|
||||
|
|
|
@ -202,7 +202,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
|||
|
||||
while (!finished) {
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
double el = (taosGetTimestampMs() - pTask->taskExecInfo.step1Start) / 1000.0;
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
||||
break;
|
||||
}
|
||||
|
@ -556,7 +556,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
// here only handle the data block sink operation
|
||||
if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||
int32_t blockSize = streamQueueItemGetSize(pInput);
|
||||
pTask->sinkRecorder.bytes += blockSize;
|
||||
pTask->execInfo.sink.bytes += blockSize;
|
||||
|
||||
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
|
||||
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
|
||||
|
|
|
@ -13,16 +13,20 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <tstream.h>
|
||||
#include "streamInt.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
|
||||
typedef struct SStreamTaskRetryInfo {
|
||||
typedef struct SLaunchHTaskInfo {
|
||||
SStreamMeta* pMeta;
|
||||
STaskId id;
|
||||
} SStreamTaskRetryInfo;
|
||||
} SLaunchHTaskInfo;
|
||||
|
||||
typedef struct STaskRecheckInfo {
|
||||
SStreamTask* pTask;
|
||||
SStreamTaskCheckReq req;
|
||||
} STaskRecheckInfo;
|
||||
|
||||
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||
|
@ -39,9 +43,10 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
|
|||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
pTask->status.downstreamReady = 1;
|
||||
|
||||
int64_t el = (taosGetTimestampMs() - pTask->taskExecInfo.init);
|
||||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%"PRId64"ms, task status:%s",
|
||||
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
pTask->execInfo.start = taosGetTimestampMs();
|
||||
int64_t el = (pTask->execInfo.start - pTask->execInfo.init);
|
||||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
}
|
||||
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
||||
|
@ -126,8 +131,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
// serialize streamProcessScanHistoryFinishRsp
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.reqId = tGenIdPI64();
|
||||
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
req.downstreamNodeId = pTask->fixedDispatcher.nodeId;
|
||||
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
pTask->checkReqId = req.reqId;
|
||||
|
||||
stDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
||||
|
@ -135,7 +140,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
||||
pWindow->skey, pWindow->ekey, req.stage, req.reqId);
|
||||
|
||||
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
|
@ -154,7 +159,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, stage:%" PRId64,
|
||||
pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i, req.stage);
|
||||
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||
|
@ -168,8 +173,15 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
SStreamTaskCheckReq req = {
|
||||
static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
STaskRecheckInfo* pInfo = taosMemoryCalloc(1, sizeof(STaskRecheckInfo));
|
||||
if (pInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->pTask = pTask;
|
||||
pInfo->req = (SStreamTaskCheckReq){
|
||||
.reqId = pRsp->reqId,
|
||||
.streamId = pRsp->streamId,
|
||||
.upstreamTaskId = pRsp->upstreamTaskId,
|
||||
|
@ -180,25 +192,41 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
|||
.stage = pTask->pMeta->stage,
|
||||
};
|
||||
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
static void destroyRecheckInfo(STaskRecheckInfo* pInfo) {
|
||||
if (pInfo != NULL) {
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void recheckDownstreamTasks(void* param, void* tmrId) {
|
||||
STaskRecheckInfo* pInfo = param;
|
||||
SStreamTask* pTask = pInfo->pTask;
|
||||
|
||||
SStreamTaskCheckReq* pReq = &pInfo->req;
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage);
|
||||
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
|
||||
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
if (pVgInfo->taskId == req.downstreamTaskId) {
|
||||
if (pVgInfo->taskId == pReq->downstreamTaskId) {
|
||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage);
|
||||
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
|
||||
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
|
||||
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pVgInfo->epSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
destroyRecheckInfo(pInfo);
|
||||
atomic_sub_fetch_8(&pTask->status.timerActive, 1);
|
||||
}
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) {
|
||||
|
@ -265,6 +293,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (streamTaskShouldStop(&pTask->status)) {
|
||||
stDebug("s-task:%s should stop, do not do check downstream again", id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
bool found = false;
|
||||
|
@ -293,7 +326,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
} else {
|
||||
int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
|
||||
|
@ -305,18 +338,28 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
}
|
||||
} else { // not ready, wait for 100ms and retry
|
||||
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
stError("s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, roll-back needed not send check again",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
stError(
|
||||
"s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, "
|
||||
"roll-back needed",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
} else if (pRsp->status == TASK_SELF_NEW_STAGE) {
|
||||
stError(
|
||||
"s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, roll-back needed "
|
||||
"and not send check again",
|
||||
id, pRsp->oldStage, (int32_t) pTask->pMeta->stage);
|
||||
"s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, not send check "
|
||||
"again, roll-back needed",
|
||||
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
||||
} else {
|
||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id,
|
||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage);
|
||||
taosMsleep(100);
|
||||
streamRecheckDownstream(pTask, pRsp);
|
||||
|
||||
STaskTimer* pTmr = pTask->pTimer;
|
||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||
|
||||
atomic_add_fetch_8(&pTask->status.timerActive, 1);
|
||||
if (pTmr->checkTimer != NULL) {
|
||||
taosTmrReset(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer, &pTmr->checkTimer);
|
||||
} else {
|
||||
pTmr->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -547,8 +590,8 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
|
|||
}
|
||||
|
||||
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||
SStreamTaskRetryInfo* pInfo = param;
|
||||
SStreamMeta* pMeta = pInfo->pMeta;
|
||||
SLaunchHTaskInfo* pInfo = param;
|
||||
SStreamMeta* pMeta = pInfo->pMeta;
|
||||
|
||||
stDebug("s-task:0x%x in timer to launch related history task", (int32_t) pInfo->id.taskId);
|
||||
|
||||
|
@ -582,7 +625,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
"destroyed, or should stop",
|
||||
pTask->id.idStr, pMeta->vgId, pStatus, (int32_t) pTask->historyTaskId.taskId);
|
||||
|
||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
@ -621,14 +664,14 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
|
||||
pMeta->vgId, hTaskId);
|
||||
|
||||
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
|
||||
SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
|
||||
pInfo->id.taskId = pTask->id.taskId;
|
||||
pInfo->id.streamId = pTask->id.streamId;
|
||||
pInfo->pMeta = pTask->pMeta;
|
||||
|
||||
if (pTask->launchTaskTimer == NULL) {
|
||||
pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
|
||||
if (pTask->launchTaskTimer == NULL) {
|
||||
if (pTask->pTimer->hTaskLaunchTimer == NULL) {
|
||||
pTask->pTimer->hTaskLaunchTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
|
||||
if (pTask->pTimer->hTaskLaunchTimer == NULL) {
|
||||
// todo failed to create timer
|
||||
taosMemoryFree(pInfo);
|
||||
} else {
|
||||
|
@ -639,7 +682,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
} else { // timer exists
|
||||
ASSERT(pTask->status.timerActive == 1);
|
||||
stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
|
||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
|
||||
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->pTimer->hTaskLaunchTimer);
|
||||
}
|
||||
|
||||
// try again in 100ms
|
||||
|
|
|
@ -129,9 +129,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->fixedDispatcher.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->fixedDispatcher.nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pTask->fixedDispatcher.epSet) < 0) return -1;
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
|
@ -211,9 +211,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pTask->fixedDispatcher.epSet) < 0) return -1;
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
|
@ -289,20 +289,17 @@ static void freeUpstreamItem(void* p) {
|
|||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
|
||||
STaskExecStatisInfo* pStatis = &pTask->taskExecInfo;
|
||||
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
||||
|
||||
stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
|
||||
stDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
||||
" nextProcessVer:%" PRId64,
|
||||
" nextProcessVer:%" PRId64", checkpointCount:%d",
|
||||
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
||||
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer);
|
||||
|
||||
if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) {
|
||||
int32_t k = 1;
|
||||
}
|
||||
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer,
|
||||
pStatis->checkpoint);
|
||||
|
||||
// remove the ref by timer
|
||||
while (pTask->status.timerActive > 0) {
|
||||
|
@ -315,9 +312,22 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
pTask->schedInfo.pTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->launchTaskTimer != NULL) {
|
||||
taosTmrStop(pTask->launchTaskTimer);
|
||||
pTask->launchTaskTimer = NULL;
|
||||
if (pTask->pTimer != NULL) {
|
||||
if (pTask->pTimer->hTaskLaunchTimer != NULL) {
|
||||
taosTmrStop(pTask->pTimer->hTaskLaunchTimer);
|
||||
pTask->pTimer->hTaskLaunchTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->pTimer->dispatchTimer != NULL) {
|
||||
taosTmrStop(pTask->pTimer->dispatchTimer);
|
||||
pTask->pTimer->dispatchTimer = NULL;
|
||||
}
|
||||
|
||||
if (pTask->pTimer->checkTimer != NULL) {
|
||||
taosTmrStop(pTask->pTimer->checkTimer);
|
||||
pTask->pTimer->checkTimer = NULL;
|
||||
}
|
||||
taosMemoryFreeClear(pTask->pTimer);
|
||||
}
|
||||
|
||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
||||
|
@ -402,7 +412,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pTask->taskExecInfo.created = taosGetTimestampMs();
|
||||
pTask->execInfo.created = taosGetTimestampMs();
|
||||
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMeta = pMeta;
|
||||
|
@ -419,6 +429,12 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer));
|
||||
if (pTask->pTimer == NULL) {
|
||||
stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50);
|
||||
|
||||
TdThreadMutexAttr attr = {0};
|
||||
|
@ -501,7 +517,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
|||
}
|
||||
|
||||
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
|
||||
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
|
||||
STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher;
|
||||
pDispatcher->taskId = pDownstreamTask->id.taskId;
|
||||
pDispatcher->nodeId = pDownstreamTask->info.nodeId;
|
||||
pDispatcher->epSet = pDownstreamTask->info.epSet;
|
||||
|
@ -530,7 +546,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
|||
}
|
||||
}
|
||||
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
|
||||
STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher;
|
||||
if (pDispatcher->nodeId == nodeId) {
|
||||
epsetAssign(&pDispatcher->epSet, pEpSet);
|
||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
||||
|
@ -598,7 +614,7 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
|
|||
}
|
||||
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||
STaskExecStatisInfo* p = &pTask->taskExecInfo;
|
||||
STaskExecStatisInfo* p = &pTask->execInfo;
|
||||
|
||||
int32_t numOfNodes = taosArrayGetSize(pNodeList);
|
||||
int64_t prevTs = p->latestUpdateTs;
|
||||
|
|
Loading…
Reference in New Issue