refactor: do some internal refactor.
This commit is contained in:
parent
a19e63fd23
commit
3710ea4aca
|
@ -1112,8 +1112,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// wait for the stream task get ready for scan history data
|
||||
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
|
||||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
|
||||
pStreamTask->info.taskLevel);
|
||||
tqDebug("s-task:%s level:%d related stream task:%s not ready for halt, wait for it continue and recheck in 100ms",
|
||||
pTask->id.idStr, pStreamTask->id.idStr, pStreamTask->info.taskLevel);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
|
@ -1181,11 +1181,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamTryExec(pTask);
|
||||
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
tqDebug("s-task:%s set status to be dropping", pId);
|
||||
|
||||
// transfer the ownership of executor state
|
||||
streamTaskReleaseState(pTask);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
|
||||
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
streamMetaSaveTask(pMeta, pStreamTask);
|
||||
|
@ -1236,12 +1232,14 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("failed to find task:0x%x", req.taskId);
|
||||
tqError("failed to find task:0x%x, it may have been dropped already", req.taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// transfer the ownership of executor state
|
||||
streamTaskReleaseState(pTask);
|
||||
tqDebug("s-task:%s receive state transfer req", pTask->id.idStr);
|
||||
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
|
||||
|
|
|
@ -351,7 +351,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
|||
|
||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||
qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);
|
||||
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
|
||||
|
||||
// todo handle stream task is dropped here
|
||||
|
||||
|
@ -390,7 +390,12 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
pTimeWindow->skey = INT64_MIN;
|
||||
qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor);
|
||||
|
||||
// transfer the ownership of executor state
|
||||
streamTaskReleaseState(pTask);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
|
||||
streamSetStatusNormal(pStreamTask);
|
||||
|
||||
streamSchedExec(pStreamTask);
|
||||
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -584,6 +589,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamTaskReleaseState(SStreamTask* pTask) {
|
||||
qDebug("s-task:%s release exec state", pTask->id.idStr);
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
if (pExecutor != NULL) {
|
||||
int32_t code = qStreamOperatorReleaseState(pExecutor);
|
||||
|
@ -594,6 +600,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamTaskReloadState(SStreamTask* pTask) {
|
||||
qDebug("s-task:%s reload exec state", pTask->id.idStr);
|
||||
void* pExecutor = pTask->exec.pExecutor;
|
||||
if (pExecutor != NULL) {
|
||||
int32_t code = qStreamOperatorReloadState(pExecutor);
|
||||
|
|
Loading…
Reference in New Issue