refactor: do some internal refactor.
This commit is contained in:
parent
f2848de12a
commit
38164435f9
|
@ -192,11 +192,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
int32_t streamScanHistoryData(SStreamTask* pTask) {
|
int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
int32_t size = 0;
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void* exec = pTask->exec.pExecutor;
|
void* exec = pTask->exec.pExecutor;
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
int32_t outputBatchSize = 100;
|
|
||||||
|
|
||||||
qSetStreamOpOpen(exec);
|
qSetStreamOpOpen(exec);
|
||||||
|
|
||||||
|
@ -213,6 +211,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t size = 0;
|
||||||
int32_t numOfBlocks = 0;
|
int32_t numOfBlocks = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldStop(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
@ -247,9 +246,10 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
|
|
||||||
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
|
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
|
||||||
|
|
||||||
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
||||||
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%d output num-limit:%d, size-limit:%d reached",
|
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
|
||||||
pTask->id.idStr, numOfBlocks, size, outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
|
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
|
||||||
|
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -260,8 +260,6 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
size = 0;
|
|
||||||
} else {
|
} else {
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue