refactor: do some internal refactor.
This commit is contained in:
parent
ffee5ebc61
commit
92feb89a09
|
@ -137,8 +137,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
// tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
|
||||||
|
|
||||||
SStreamQueueItem* pItem = NULL;
|
SStreamQueueItem* pItem = NULL;
|
||||||
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
||||||
|
|
|
@ -16,8 +16,10 @@
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
|
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480
|
||||||
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100)
|
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (50)
|
||||||
|
#define ONE_MB_F (1048576.0)
|
||||||
|
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F)
|
||||||
|
|
||||||
int32_t streamInit() {
|
int32_t streamInit() {
|
||||||
int8_t old;
|
int8_t old;
|
||||||
|
@ -281,27 +283,25 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tInputQueueIsFull(const SStreamTask* pTask) {
|
bool tInputQueueIsFull(const SStreamTask* pTask) {
|
||||||
return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
|
bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY;
|
||||||
|
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
||||||
|
return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
int8_t type = pItem->type;
|
int8_t type = pItem->type;
|
||||||
|
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
|
double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue);
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem;
|
SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem;
|
||||||
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
|
||||||
qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr,
|
|
||||||
pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
|
|
||||||
pSubmitBlock->submit.ver, total);
|
|
||||||
|
|
||||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size);
|
pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total, size);
|
||||||
|
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
|
||||||
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
|
||||||
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
|
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
|
||||||
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
|
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
|
||||||
numOfBlocks, size);
|
total, size);
|
||||||
streamDataSubmitDestroy(pSubmitBlock);
|
streamDataSubmitDestroy(pSubmitBlock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -310,10 +310,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
|
|
||||||
|
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
|
||||||
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
|
||||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
|
||||||
size);
|
size);
|
||||||
|
|
Loading…
Reference in New Issue