Merge pull request #26960 from taosdata/fix/syntax
fix(stream): check status before start init, do some internal refactor.
This commit is contained in:
commit
2e342104d3
|
@ -647,6 +647,7 @@ const char* streamTaskGetStatusStr(ETaskStatus status);
|
||||||
void streamTaskResetStatus(SStreamTask* pTask);
|
void streamTaskResetStatus(SStreamTask* pTask);
|
||||||
void streamTaskSetStatusReady(SStreamTask* pTask);
|
void streamTaskSetStatusReady(SStreamTask* pTask);
|
||||||
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
|
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
|
||||||
|
const char* streamTaskGetExecType(int32_t type);
|
||||||
|
|
||||||
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||||
|
|
|
@ -82,7 +82,7 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed sched task to scan wal", vgId);
|
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -355,43 +355,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
|
||||||
* All down stream tasks have successfully completed the check point task.
|
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
|
||||||
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
|
int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
|
||||||
*/
|
const char* id, int32_t* pNotReady, int32_t* pTransId) {
|
||||||
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
|
|
||||||
int32_t downstreamTaskId) {
|
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
|
|
||||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
||||||
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
bool received = false;
|
bool received = false;
|
||||||
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
|
||||||
ASSERT(total > 0);
|
|
||||||
|
|
||||||
// 1. not in checkpoint status now
|
|
||||||
SStreamTaskState pStat = streamTaskGetStatus(pTask);
|
|
||||||
if (pStat.state != TASK_STATUS__CK) {
|
|
||||||
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
|
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
|
|
||||||
if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
|
|
||||||
stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
|
|
||||||
") from task:0x%x, expired and discard ",
|
|
||||||
id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMutexLock(&pInfo->lock);
|
|
||||||
|
|
||||||
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
|
|
||||||
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
streamMutexUnlock(&pInfo->lock);
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,27 +375,69 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
|
||||||
|
|
||||||
if (received) {
|
if (received) {
|
||||||
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
|
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
|
||||||
downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total);
|
downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
|
||||||
|
numOfDownstream);
|
||||||
} else {
|
} else {
|
||||||
STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
|
STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
|
||||||
.downstreamTaskId = downstreamTaskId,
|
.downstreamTaskId = downstreamTaskId,
|
||||||
.checkpointId = pInfo->activeId,
|
.checkpointId = pInfo->activeId,
|
||||||
.transId = pInfo->transId,
|
.transId = pInfo->transId,
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = streamId,
|
||||||
.downstreamNodeId = downstreamNodeId};
|
.downstreamNodeId = downstreamNodeId};
|
||||||
(void)taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
|
void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
|
||||||
|
if (p == NULL) {
|
||||||
|
stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
*pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
|
||||||
int32_t transId = pInfo->transId;
|
*pTransId = pInfo->transId;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All down stream tasks have successfully completed the check point task.
|
||||||
|
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
|
||||||
|
*/
|
||||||
|
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
|
||||||
|
int32_t downstreamTaskId) {
|
||||||
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||||
|
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t notReady = 0;
|
||||||
|
int32_t transId = 0;
|
||||||
|
|
||||||
|
ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG));
|
||||||
|
|
||||||
|
// 1. not in checkpoint status now
|
||||||
|
SStreamTaskState pStat = streamTaskGetStatus(pTask);
|
||||||
|
if (pStat.state != TASK_STATUS__CK) {
|
||||||
|
stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
|
||||||
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
|
||||||
|
if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
|
||||||
|
stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
|
||||||
|
") from task:0x%x, expired and discard",
|
||||||
|
id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMutexLock(&pInfo->lock);
|
||||||
|
code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready,
|
||||||
|
&transId);
|
||||||
streamMutexUnlock(&pInfo->lock);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
|
|
||||||
if (notReady == 0) {
|
if ((notReady == 0) && (code == 0)) {
|
||||||
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
|
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
|
||||||
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
|
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
|
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
|
||||||
|
@ -1034,8 +1048,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||||
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
streamMutexUnlock(&pInfo->lock);
|
continue;
|
||||||
return num;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->recved) {
|
if (p->recved) {
|
||||||
|
|
|
@ -1379,8 +1379,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||||
code = ret;
|
code = ret;
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
|
@ -1466,12 +1469,14 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
|
bool continueExec = true;
|
||||||
|
|
||||||
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId);
|
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
|
||||||
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
|
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
}
|
}
|
||||||
|
@ -1479,10 +1484,28 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
// fill-history task can only be launched by related stream tasks.
|
// fill-history task can only be launched by related stream tasks.
|
||||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
|
||||||
|
// concurrently start this task by two threads.
|
||||||
|
streamMutexLock(&pTask->lock);
|
||||||
|
SStreamTaskState status = streamTaskGetStatus(pTask);
|
||||||
|
if (status.state != TASK_STATUS__UNINIT) {
|
||||||
|
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
|
||||||
|
continueExec = false;
|
||||||
|
} else {
|
||||||
|
continueExec = true;
|
||||||
|
}
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
if (!continueExec) {
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
// avoid initialization and destroy running concurrently.
|
// avoid initialization and destroy running concurrently.
|
||||||
|
@ -1498,13 +1521,19 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// concurrently start task may cause the later started task be failed, and also failed to added into meta result.
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);
|
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
|
||||||
|
tstrerror(code));
|
||||||
|
|
||||||
|
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
||||||
|
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return code;
|
return code;
|
||||||
|
@ -1536,11 +1565,12 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
int64_t endTs, bool ready) {
|
int64_t endTs, bool ready) {
|
||||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
||||||
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1555,7 +1585,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
stDebug(
|
stDebug(
|
||||||
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
|
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
|
||||||
"time:%" PRId64 "ms",
|
"time:%" PRId64 "ms",
|
||||||
pMeta->vgId, taskId, ready, el);
|
vgId, taskId, ready, el);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1565,6 +1595,15 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||||
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||||
if (code) {
|
if (code) {
|
||||||
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
|
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
|
||||||
|
" already exist start results in meta start task result hashmap",
|
||||||
|
vgId, id.taskId);
|
||||||
|
} else {
|
||||||
|
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
|
||||||
|
}
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
@ -1576,20 +1615,20 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
|
|
||||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
||||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||||
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
||||||
pStartInfo->elapsedTime / 1000.0);
|
pStartInfo->elapsedTime / 1000.0);
|
||||||
|
|
||||||
// print the initialization elapsed time and info
|
// print the initialization elapsed time and info
|
||||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||||
streamMetaResetStartInfo(pStartInfo, pMeta->vgId);
|
streamMetaResetStartInfo(pStartInfo, vgId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
code = pStartInfo->completeFn(pMeta);
|
code = pStartInfo->completeFn(pMeta);
|
||||||
} else {
|
} else {
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
|
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
|
||||||
ready, numOfRecv, numOfTotal);
|
numOfRecv, numOfTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -48,14 +48,15 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
|
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
|
||||||
terrstr());
|
terrstr(terrno));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamId != 0) {
|
if (streamId != 0) {
|
||||||
stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType);
|
stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType,
|
||||||
|
streamTaskGetExecType(execType));
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d create msg to exec, type:%d", vgId, execType);
|
stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
|
||||||
}
|
}
|
||||||
|
|
||||||
pRunReq->head.vgId = vgId;
|
pRunReq->head.vgId = vgId;
|
||||||
|
|
|
@ -1150,3 +1150,24 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
|
||||||
taosArrayClear(pInfo->pDispatchTriggerList);
|
taosArrayClear(pInfo->pDispatchTriggerList);
|
||||||
taosArrayClear(pInfo->pCheckpointReadyRecvList);
|
taosArrayClear(pInfo->pCheckpointReadyRecvList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char* streamTaskGetExecType(int32_t type) {
|
||||||
|
switch (type) {
|
||||||
|
case STREAM_EXEC_T_EXTRACT_WAL_DATA:
|
||||||
|
return "scan-wal-file";
|
||||||
|
case STREAM_EXEC_T_START_ALL_TASKS:
|
||||||
|
return "start-all-tasks";
|
||||||
|
case STREAM_EXEC_T_START_ONE_TASK:
|
||||||
|
return "start-one-task";
|
||||||
|
case STREAM_EXEC_T_RESTART_ALL_TASKS:
|
||||||
|
return "restart-all-tasks";
|
||||||
|
case STREAM_EXEC_T_STOP_ALL_TASKS:
|
||||||
|
return "stop-all-tasks";
|
||||||
|
case STREAM_EXEC_T_RESUME_TASK:
|
||||||
|
return "resume-task-from-idle";
|
||||||
|
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
||||||
|
return "record-start-failed-task";
|
||||||
|
default:
|
||||||
|
return "invalid-exec-type";
|
||||||
|
}
|
||||||
|
}
|
|
@ -322,12 +322,11 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
||||||
|
|
||||||
if (pTrans->attachEvent.event != 0) {
|
if (pTrans->attachEvent.event != 0) {
|
||||||
code = attachWaitedEvent(pTask, &pTrans->attachEvent);
|
code = attachWaitedEvent(pTask, &pTrans->attachEvent);
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// wait for the task to be here
|
// wait for the task to be here
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
|
@ -406,7 +405,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
stDebug("s-task:%s status:%s handling event:%s by another thread, wait for 100ms and check if completed",
|
||||||
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
} else {
|
} else {
|
||||||
|
@ -419,6 +418,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSM->pActiveTrans != NULL) {
|
if (pSM->pActiveTrans != NULL) {
|
||||||
|
// not allowed concurrently initialization
|
||||||
|
if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) {
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr);
|
||||||
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
|
}
|
||||||
|
|
||||||
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
// currently in some state transfer procedure, not auto invoke transfer, abort it
|
||||||
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
||||||
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
||||||
|
@ -557,6 +563,11 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* streamTaskGetStatusStr(ETaskStatus status) {
|
const char* streamTaskGetStatusStr(ETaskStatus status) {
|
||||||
|
int32_t index = status;
|
||||||
|
if (index < 0 || index > tListLen(StreamTaskStatusList)) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
return StreamTaskStatusList[status].name;
|
return StreamTaskStatusList[status].name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue