fix(stream): fix errors in scan-history, introduced by refactor

This commit is contained in:
Haojun Liao 2023-11-07 16:19:09 +08:00
parent c9a1cc50fb
commit 1301b92844
3 changed files with 100 additions and 72 deletions

View File

@ -802,7 +802,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask); SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level // agg level

View File

@ -1152,7 +1152,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
} else { } else {
if (pTask->execInfo.step2Start == 0) { if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start); tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
} else { } else {
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start); tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1172,10 +1173,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask); int64_t st = taosGetTimestampMs();
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
// todo update the step1 exec elapsed time double el = (taosGetTimestampMs() - st) / 1000.0;
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; pTask->execInfo.step1El += el;
if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) { if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
int8_t status = streamTaskSetSchedStatusInactive(pTask); int8_t status = streamTaskSetSchedStatusInactive(pTask);
@ -1188,9 +1190,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ETaskStatus s = streamTaskGetStatus(pTask, &p); ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__PAUSE) { if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
el, pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data", pTask->id.idStr, p); tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p,
pTask->execInfo.step1El);
} }
} }
@ -1199,7 +1203,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
// the following procedure should be executed, no matter status is stop/pause or not // the following procedure should be executed, no matter status is stop/pause or not
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;
@ -1508,6 +1512,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
streamSchedExec(pTask); streamSchedExec(pTask);
} }
} else if (status == TASK_STATUS__UNINIT) { } else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event); streamTaskHandleEvent(pTask->status.pSM, event);

View File

@ -187,95 +187,118 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code; return code;
} }
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) { static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); int32_t code = TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
}
} else {
taosArrayDestroy(pRes);
}
return code;
}
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
int32_t numOfBlocks = 0;
while (1) {
if (streamTaskShouldStop(pTask)) {
break;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
break;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
tstrerror(code));
continue;
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
(*pFinish) = qStreamScanhistoryFinished(exec);
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
(*pSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break;
}
}
}
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
int64_t st = taosGetTimestampMs();
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
while (!finished) { while (1) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr);
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); // quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; // quit from step1, not continue to handle the step2 return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, tstrerror(terrno)); stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr,
tstrerror(terrno));
continue; continue;
} }
int32_t size = 0; int32_t size = 0;
int32_t numOfBlocks = 0; streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
while (1) {
if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if(streamTaskShouldStop(pTask)) {
stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("s-task:%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue;
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
finished = qStreamScanhistoryFinished(exec);
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break;
}
} }
if (taosArrayGetSize(pRes) > 0) { // dispatch the generated results
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); int32_t code = handleResultBlocks(pTask, pRes, size);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code));
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
}
} else {
taosArrayDestroy(pRes);
}
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
// downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
}
if (finished) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
}
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms",
pTask->info.fillHistory, pTask->info.taskLevel); pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
} }
} }
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};;
} }
// wait for the stream task to be idle // wait for the stream task to be idle
@ -285,7 +308,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) { while (!streamTaskIsIdle(pStreamTask)) {
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel, stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
pStreamTask->id.idStr); pStreamTask->id.idStr);
taosMsleep(100); taosMsleep(100);
} }