fix(stream): record the tasks started info, and do some internal refactor.

This commit is contained in:
Haojun Liao 2023-09-25 09:59:05 +08:00
parent c987f61300
commit eb4078b7f3
6 changed files with 29 additions and 40 deletions

View File

@ -391,6 +391,13 @@ struct SStreamTask {
char reserve[256];
};
typedef struct STaskStartInfo {
int64_t ts;
int32_t startedAfterNodeUpdate;
int32_t readyTasks;
int32_t elapsedTime;
} STaskStartInfo;
// meta
typedef struct SStreamMeta {
char* path;
@ -405,7 +412,7 @@ typedef struct SStreamMeta {
int32_t vgId;
int64_t stage;
bool leader;
int8_t taskStartedByNodeUpdate;
STaskStartInfo startInfo;
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;

View File

@ -1816,7 +1816,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet);
if (updateTasks < numOfTasks) {
pMeta->taskStartedByNodeUpdate = 1;
pMeta->startInfo.startedAfterNodeUpdate = 1;
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
taosWUnLockLatch(&pMeta->lock);
@ -1825,7 +1825,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->taskStartedByNodeUpdate = 0;
pMeta->startInfo.startedAfterNodeUpdate = 0;
taosWUnLockLatch(&pMeta->lock);
} else {
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);

View File

@ -231,11 +231,12 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
pMeta->startInfo.ts = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);

View File

@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pVnode->restored = true;
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
if (pVnode->pTq->pStreamMeta->taskStartedByNodeUpdate) {
if (pVnode->pTq->pStreamMeta->startInfo.startedAfterNodeUpdate) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
return;

View File

@ -154,41 +154,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
int32_t taskLevel = pTask->info.taskLevel;
*numOfBlocks = 0;
// if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
// while (1) {
// if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
// stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
// return TSDB_CODE_SUCCESS;
// }
//
// STokenBucket* pBucket = pTask->pTokenBucket;
// // if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this
// execution
// // stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit",
// pTask->id.idStr,
// // pBucket->capacity, pBucket->rate);
// // return TSDB_CODE_SUCCESS;
// // }
//
// SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
// if (qItem == NULL) {
// if (++retryTimes < MAX_RETRY_TIMES) {
// taosMsleep(10);
// continue;
// }
//
// return TSDB_CODE_SUCCESS;
// }
//
// stDebug("s-task:%s sink task handle block, type:%s", id, streamQueueItemGetTypeStr(qItem->type));
// pTask->sinkRecorder.bytes += streamQueueItemGetSize(qItem);
//
// *numOfBlocks = 1;
// *pInput = qItem;
// return TSDB_CODE_SUCCESS;
// }
// }
while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);

View File

@ -34,6 +34,9 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
@ -48,6 +51,19 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
int64_t el = (pTask->execInfo.start - pTask->execInfo.init);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus));
taosWLockLatch(&pMeta->lock);
pMeta->startInfo.readyTasks += 1;
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (pMeta->startInfo.readyTasks == numOfTotal) {
// reset value for next time start
pMeta->startInfo.readyTasks = 0;
pMeta->startInfo.startedAfterNodeUpdate = 0;
pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts;
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2f sec",
vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0);
}
taosWUnLockLatch(&pMeta->lock);
}
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {