fix(stream): do some internal refactor.
This commit is contained in:
parent
a1263b8b04
commit
3b814f2478
|
@ -20,13 +20,13 @@ static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
// this function should be executed by stream threads.
|
// this function should be executed by stream threads.
|
||||||
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
||||||
int32_t tqStreamTasksScanWal(STQ* pTq) {
|
int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t scan = pMeta->walScanCounter;
|
int32_t scan = pMeta->walScanCounter;
|
||||||
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
|
tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan);
|
||||||
|
|
||||||
// check all restore tasks
|
// check all restore tasks
|
||||||
bool shouldIdle = true;
|
bool shouldIdle = true;
|
||||||
|
|
|
@ -212,9 +212,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||||
qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr,
|
qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId,
|
||||||
pReq->upstreamTaskId);
|
pReq->upstreamNodeId);
|
||||||
|
|
||||||
|
// todo add the input queue buffer limitation
|
||||||
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
|
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
|
||||||
tDeleteStreamDispatchReq(pReq);
|
tDeleteStreamDispatchReq(pReq);
|
||||||
|
|
||||||
|
@ -222,10 +223,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
if (streamTryExec(pTask) < 0) {
|
if (streamTryExec(pTask) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
|
|
||||||
/*streamDispatch(pTask);*/
|
|
||||||
/*}*/
|
|
||||||
} else {
|
} else {
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue