refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-15 11:28:36 +08:00
parent d46b5e6c9a
commit e012bc4bde
8 changed files with 224 additions and 266 deletions

View File

@ -801,7 +801,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
int64_t* oldStage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
@ -826,8 +826,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamResetParamForScanHistory(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
@ -835,7 +834,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);

View File

@ -742,8 +742,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
return code;
}
streamTaskOpenAllUpstreamInput(pTask);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
STaskId taskId = {0};
if (pTask->info.fillHistory) {

View File

@ -76,33 +76,6 @@ int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
return 0;
}
int32_t tqUpdateNodeEpsetAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr());
return -1;
}
tqDebug("vgId:%d update s-task:0x%x nodeEpset async", vgId, taskId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = STREAM_EXEC_T_UPDATE_TASK_EPSET;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
return 0;
}
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -728,10 +701,6 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) {
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
// if ((*pTask)->info.fillHistory == 1) {
// streamResetParamForScanHistory(*pTask);
// }
}
return 0;
@ -922,7 +891,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
streamTaskPause(pTask, pMeta);
streamTaskPause(pMeta, pTask);
SStreamTask* pHistoryTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -939,7 +908,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask, pMeta);
streamTaskPause(pMeta, pHistoryTask);
streamMetaReleaseTask(pMeta, pHistoryTask);
}

View File

@ -120,7 +120,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,

View File

@ -243,18 +243,18 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// blocked. Note that there is no race condition here.
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
streamTaskCloseAllUpstreamInput(pTask, pReq->upstreamTaskId);
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
streamTaskCloseAllUpstreamInput(pTask, pReq->upstreamTaskId);
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
STaskId* pRelTaskId = &pTask->streamTaskId;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
if (pStreamTask != NULL) {
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
streamTaskCloseAllUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
streamMetaReleaseTask(pMeta, pStreamTask);
}
@ -300,28 +300,6 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
if (num == 0) {
return;
}
for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
pInfo->dataAllowed = true;
}
pTask->upstreamInfo.numOfClosed = 0;
stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr);
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
}
}
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < num; ++i) {

View File

@ -47,6 +47,12 @@ struct SMetaHbInfo {
int64_t hbStart;
};
typedef struct STaskInitTs {
int64_t start;
int64_t end;
bool success;
} STaskInitTs;
SMetaRefMgt gMetaRefMgt;
void metaRefMgtInit();
@ -581,7 +587,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false;
STaskId id = streamTaskExtractKey(pTask);
STaskId id = streamTaskGetTaskId(pTask);
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p != NULL) {
return 0;
@ -871,7 +877,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);
STaskId id = streamTaskExtractKey(pTask);
STaskId id = streamTaskGetTaskId(pTask);
taosArrayPush(pRecycleList, &id);
int32_t total = taosArrayGetSize(pRecycleList);
@ -1407,37 +1413,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
}
}
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
streamMetaRUnLock(pMeta);
return 0;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
@ -1512,6 +1487,37 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
return code;
}
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
streamMetaRUnLock(pMeta);
return 0;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId);
@ -1547,4 +1553,74 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
streamMetaReleaseTask(pMeta, pTask);
return ret;
}
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
}
}
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
streamMetaWLock(pMeta);
if (pStartInfo->taskStarting != 1) {
int64_t el = endTs - startTs;
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
pMeta->vgId, taskId, ready, el);
streamMetaWUnLock(pMeta);
return 0;
}
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
if (numOfRecv == numOfTotal) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
streamMetaWUnLock(pMeta);
pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
ready, numOfRecv, numOfTotal);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -35,12 +35,6 @@ typedef struct STaskRecheckInfo {
void* checkTimer;
} STaskRecheckInfo;
typedef struct STaskInitTs {
int64_t start;
int64_t end;
bool success;
} STaskInitTs;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
@ -546,15 +540,6 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
}
int32_t streamResetParamForScanHistory(SStreamTask* pTask) {
stDebug("s-task:%s reset operator option for scan-history data", pTask->id.idStr);
if (pTask->exec.pExecutor != NULL) {
return qResetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t streamRestoreParam(SStreamTask* pTask) {
stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
@ -1134,97 +1119,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
}
}
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor;
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
}
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
}
void streamTaskResume(SStreamTask* pTask) {
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
SStreamMeta* pMeta = pTask->pMeta;
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
char* pNew = streamTaskGetStatus(pTask)->name;
if (prevState.state == TASK_STATUS__PAUSE) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
}
} else {
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
}
}
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
}
}
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
streamMetaWLock(pMeta);
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
if (numOfRecv == numOfTotal) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
streamMetaWUnLock(pMeta);
pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
ready, numOfRecv, numOfTotal);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -30,6 +30,55 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
return 0;
}
static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
return 0;
}
static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p;
rpcFreeCont(pInfo->msg.pCont);
}
static void freeUpstreamItem(void* p) {
SStreamChildEpInfo** pInfo = p;
taosMemoryFree(*pInfo);
}
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->info.selfChildId;
pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId;
pEpInfo->stage = -1;
return pEpInfo;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
@ -291,16 +340,6 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
return 0;
}
static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p;
rpcFreeCont(pInfo->msg.pCont);
}
static void freeUpstreamItem(void* p) {
SStreamChildEpInfo** pInfo = p;
taosMemoryFree(*pInfo);
}
void tFreeStreamTask(SStreamTask* pTask) {
char* p = NULL;
int32_t taskId = pTask->id.taskId;
@ -475,14 +514,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
taosThreadMutexInit(&pTask->lock, &attr);
// if (pTask->info.fillHistory == 1) {
// //
// } else {
// }
// if (streamTaskSetDb(pMeta, pTask) != 0) {
// return -1;
// }
streamTaskOpenAllUpstreamInput(pTask);
pTask->outputInfo.pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
@ -509,22 +540,6 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
}
}
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->info.selfChildId;
pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId;
pEpInfo->stage = -1;
return pEpInfo;
}
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
if (pEpInfo == NULL) {
@ -622,29 +637,6 @@ int32_t streamTaskStop(SStreamTask* pTask) {
return 0;
}
int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
return 0;
}
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
STaskExecStatisInfo* p = &pTask->execInfo;
@ -677,7 +669,29 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
}
bool streamTaskAllUpstreamClosed(SStreamTask* pTask) {
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
if (num == 0) {
return;
}
for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
pInfo->dataAllowed = true;
}
pTask->upstreamInfo.numOfClosed = 0;
stDebug("s-task:%s opening up inputQ for %d upstream tasks", pTask->id.idStr, num);
}
void streamTaskCloseAllUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
}
}
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
}
@ -760,7 +774,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
return code;
}
STaskId streamTaskExtractKey(const SStreamTask* pTask) {
STaskId streamTaskGetTaskId(const SStreamTask* pTask) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
return id;
}
@ -801,3 +815,36 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->chkpointTransId = pSrc->chkpointTransId;
}
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor;
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
}
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
}
void streamTaskResume(SStreamTask* pTask) {
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
SStreamMeta* pMeta = pTask->pMeta;
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
char* pNew = streamTaskGetStatus(pTask)->name;
if (prevState.state == TASK_STATUS__PAUSE) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
}
} else {
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
}
}