refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-05-20 12:20:04 +08:00
parent 6ded3284da
commit 5cbad1da3e
1 changed files with 47 additions and 26 deletions

View File

@ -18,6 +18,7 @@
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
#define MIN_STREAM_EXEC_BATCH_NUM 16
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000
static int32_t updateCheckPointInfo (SStreamTask* pTask);
static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes);
@ -32,6 +33,39 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__PAUSE);
}
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes,
int32_t size, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = updateCheckPointInfo(pTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfBlocks = taosArrayGetSize(pRes);
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
return -1;
}
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(pStreamBlocks);
return -1;
}
} else {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
}
*totalSize += size;
*totalBlocks += numOfBlocks;
ASSERT(taosArrayGetSize(pRes) == 0);
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
@ -98,38 +132,25 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
pTask->selfChildId, numOfBlocks, size / 1048576.0);
// current output should be dispatched to down stream nodes
if (numOfBlocks > 1000) {
code = updateCheckPointInfo(pTask);
if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
return -1;
}
qDebug("s-task:%s output exec stream data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(pStreamBlocks);
return -1;
}
} else {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
}
size = 0;
numOfBlocks = 0;
ASSERT(taosArrayGetSize(pRes) == 0);
}
}
*totalSize += size;
*totalBlocks += numOfBlocks;
size = 0;
numOfBlocks = 0;
if (numOfBlocks > 0) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
ASSERT(taosArrayGetSize(pRes) == 0);
}