refactor: do some internal refactor.
This commit is contained in:
parent
a3ac8b647c
commit
d48be5c0ce
|
@ -1415,7 +1415,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
|
||||||
|
|
||||||
tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver);
|
tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver);
|
||||||
if (succ) {
|
if (succ) {
|
||||||
if (tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit) < 0) {
|
int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit);
|
||||||
|
if (code < 0) {
|
||||||
|
// let's handle the back pressure
|
||||||
|
|
||||||
tqError("stream task:%d failed to put into queue for, too many", pTask->taskId);
|
tqError("stream task:%d failed to put into queue for, too many", pTask->taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -310,6 +310,7 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
// TODO: back pressure
|
// TODO: back pressure
|
||||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue