fill history pause&resume
This commit is contained in:
parent
41c193945f
commit
870d75bf82
|
@ -779,9 +779,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
SStreamTask* pSateTask = pTask;
|
SStreamTask* pSateTask = pTask;
|
||||||
// if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
// pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
|
pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
|
||||||
// }
|
}
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -798,9 +798,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
SStreamTask* pSateTask = pTask;
|
SStreamTask* pSateTask = pTask;
|
||||||
// if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
// pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
|
pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
|
||||||
// }
|
}
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1134,9 +1134,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);
|
tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);
|
||||||
// transfer the ownership of executor state
|
// transfer the ownership of executor state
|
||||||
// todo(liuyao)
|
streamTaskReleaseState(pTask);
|
||||||
// streamTaskReleaseState(pTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
// streamTaskReloadState(pStreamTask);
|
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
streamMetaSaveTask(pMeta, pStreamTask);
|
streamMetaSaveTask(pMeta, pStreamTask);
|
||||||
|
|
||||||
|
@ -1183,10 +1182,9 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// transfer the ownership of executor state
|
// transfer the ownership of executor state
|
||||||
// todo(liuyao)
|
streamTaskReleaseState(pTask);
|
||||||
// streamTaskReleaseState(pTask);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
|
||||||
// SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
|
streamTaskReloadState(pStreamTask);
|
||||||
// streamTaskReloadState(pStreamTask);
|
|
||||||
|
|
||||||
ASSERT(pTask->streamTaskId.taskId != 0);
|
ASSERT(pTask->streamTaskId.taskId != 0);
|
||||||
pTask->status.transferState = true; // persistent data?
|
pTask->status.transferState = true; // persistent data?
|
||||||
|
|
|
@ -196,7 +196,21 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
} else {
|
} else {
|
||||||
qSetStreamOpOpen(exec);
|
qSetStreamOpOpen(exec);
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||||
|
if (qRes == NULL) {
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
qRes->blocks = pRes;
|
||||||
|
code = streamTaskOutputResultBlock(pTask, qRes);
|
||||||
|
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
||||||
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
taosFreeQitem(qRes);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue