fix(stream): set correct size for results generated by scan history stream tasks.

This commit is contained in:
Haojun Liao 2023-09-19 16:49:04 +08:00
parent 053643be19
commit ce85945583
2 changed files with 21 additions and 21 deletions

View File

@ -65,6 +65,10 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes; pStreamBlocks->blocks = pRes;
if (pItem == NULL) {
return pStreamBlocks;
}
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->sourceVer = pSubmit->ver; pStreamBlocks->sourceVer = pSubmit->ver;

View File

@ -19,6 +19,7 @@
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4 #define MIN_STREAM_EXEC_BATCH_NUM 4
#define STREAM_RESULT_DUMP_THRESHOLD 100 #define STREAM_RESULT_DUMP_THRESHOLD 100
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -75,7 +76,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
//code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
destroyStreamDataBlock(pStreamBlocks); destroyStreamDataBlock(pStreamBlocks);
return code; return code;
} }
@ -166,7 +166,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));
// current output should be dispatched to down stream nodes // current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) { if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes)); ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -192,6 +192,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
int32_t streamScanHistoryData(SStreamTask* pTask) { int32_t streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t size = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
@ -244,29 +245,24 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
block.info.childId = pTask->info.selfChildId; block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
if ((++numOfBlocks) >= outputBatchSize) { size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize);
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
qDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks,
outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
break; break;
} }
} }
if (taosArrayGetSize(pRes) > 0) { if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
if (qRes == NULL) { code = doOutputResultBlockImpl(pTask, pStreamBlocks);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_OUT_OF_MEMORY; destroyStreamDataBlock(pStreamBlocks);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
code = doOutputResultBlockImpl(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code; return code;
} }
size = 0;
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
} }