refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2025-01-26 21:50:51 +08:00
parent 67853b4a54
commit 24c6fe727f
1 changed files with 28 additions and 24 deletions

View File

@ -119,37 +119,41 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem
if (flag == STREAM_QUEUE__CHKPTFAILED) {
*pItem = pQueue->qChkptItem;
} else {
pQueue->qChkptItem = NULL;
taosReadQitem(pQueue->pChkptQueue, (void**) &pQueue->qChkptItem);
if (pQueue->qChkptItem != NULL) {
stDebug("read data from checkpoint queue, status:%d", status);
ASSERT(status != TASK_STATUS__CK && pQueue->qItem == NULL);
return;
}
*pItem = pQueue->qChkptItem;
return;
}
if (flag == STREAM_QUEUE__FAILED) {
*pItem = pQueue->qItem;
ASSERT(status != TASK_STATUS__CK && pQueue->qChkptItem == NULL);
return;
}
// if in checkpoint status, not read data from ordinary input q.
if (status == TASK_STATUS__CK) {
stDebug("in checkpoint status, not ready data in normal queue");
return;
}
pQueue->qChkptItem = NULL;
taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem);
if (pQueue->qChkptItem != NULL) {
stDebug("read data from checkpoint queue, status:%d", status);
*pItem = pQueue->qChkptItem;
return;
}
// if in checkpoint status, not read data from ordinary input q.
if (status == TASK_STATUS__CK) {
stDebug("in checkpoint status, not ready data in normal queue");
return;
}
// let's try the ordinary input q
if (flag == STREAM_QUEUE__FAILED) {
*pItem = streamQueueCurItem(pQueue);
} else {
pQueue->qItem = NULL;
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
pQueue->qItem = NULL;
(void)taosGetQitem(pQueue->qall, &pQueue->qItem);
if (pQueue->qItem == NULL) {
(void) taosReadAllQitems(pQueue->pQueue, pQueue->qall);
(void) taosGetQitem(pQueue->qall, &pQueue->qItem);
}
*pItem = streamQueueCurItem(pQueue);
if (pQueue->qItem == NULL) {
(void)taosReadAllQitems(pQueue->pQueue, pQueue->qall);
(void)taosGetQitem(pQueue->qall, &pQueue->qItem);
}
*pItem = streamQueueCurItem(pQueue);
}
void streamQueueProcessSuccess(SStreamQueue* queue) {