fix(stream): set correct size for results generated by scan history stream tasks.

This commit is contained in:
Haojun Liao 2023-09-19 16:49:04 +08:00
parent 309630eb11
commit 1decaaee1e
2 changed files with 21 additions and 21 deletions

View File

@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem == NULL) {
return pStreamBlocks;
}
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->sourceVer = pSubmit->ver;

View File

@ -16,9 +16,10 @@
#include "streamInt.h"
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
destroyStreamDataBlock(pStreamBlocks);
return code;
}
@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));
// current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) {
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) {
@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
int32_t streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
bool finished = false;
@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
if ((++numOfBlocks) >= outputBatchSize) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks,
outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
break;
}
}
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = doOutputResultBlockImpl(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock(pStreamBlocks);
return code;
}
size = 0;
} else {
taosArrayDestroy(pRes);
}