refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-06-05 16:19:37 +08:00
parent fcc706c45c
commit e8549ce511
2 changed files with 8 additions and 7 deletions

View File

@ -16,8 +16,8 @@
#include "streamInc.h" #include "streamInc.h"
#include "ttimer.h" #include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (50) #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (50)
#define ONE_MB_F (1048576.0) #define ONE_MB_F (1048576.0)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) #define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
@ -36,6 +36,7 @@ int32_t streamInit() {
} }
atomic_store_8(&streamEnv.inited, 1); atomic_store_8(&streamEnv.inited, 1);
} }
return 0; return 0;
} }
@ -284,9 +285,9 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
} }
bool tInputQueueIsFull(const SStreamTask* pTask) { bool tInputQueueIsFull(const SStreamTask* pTask) {
bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUE_CAPACITY;
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE);
} }
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
@ -298,7 +299,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size); size);
streamDataSubmitDestroy(px); streamDataSubmitDestroy(px);
taosFreeQitem(pItem); taosFreeQitem(pItem);
@ -318,7 +319,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
size); size);
destroyStreamDataBlock((SStreamDataBlock*) pItem); destroyStreamDataBlock((SStreamDataBlock*) pItem);
return -1; return -1;

View File

@ -53,7 +53,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
return 0; return 0;
} }
// checkstatus // check status
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) { int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) {
qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver); qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver);