fix(stream): if a source task is set to be non-normal, no data should be put into inputQ anymore.

This commit is contained in:
Haojun Liao 2023-07-18 15:18:57 +08:00
parent a4676fc74b
commit 901b7d8ecc
7 changed files with 32 additions and 22 deletions

View File

@ -310,6 +310,7 @@ struct SStreamTask {
SStreamId streamTaskId;
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
// output
union {
@ -566,6 +567,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
@ -615,9 +617,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
#ifdef __cplusplus
}

View File

@ -76,6 +76,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb;
pTask->pMeta = pSnode->pMeta;
taosThreadMutexInit(&pTask->lock, NULL);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {

View File

@ -928,16 +928,20 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
taosThreadMutexInit(&pTask->lock, NULL);
if (pTask->chkInfo.checkpointId != 0) {
// checkpoint ver is the kept version, handled data should be the next version.
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
}
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
return 0;
}

View File

@ -268,17 +268,25 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
int32_t numOfItems = streamTaskGetInputQItems(pTask);
// append the data for the stream
SStreamQueueItem* pItem = NULL;
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
taosThreadMutexUnlock(&pTask->lock);
continue;
}
if (pItem != NULL) {
noDataInWal = false;
code = tAppendDataToInputQueue(pTask, pItem);
@ -292,7 +300,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
}
if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) {
taosThreadMutexUnlock(&pTask->lock);
if ((code == TSDB_CODE_SUCCESS) || (numOfItems > 0)) {
code = streamSchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);

View File

@ -133,13 +133,18 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
taosThreadMutexLock(&pTask->lock);
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// inputQ, to make sure all blocks with less version have been handled by this task already.
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
taosThreadMutexUnlock(&pTask->lock);
return code;
}
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
@ -180,15 +185,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
// todo: sink node needs alignment??
/* ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id);
streamFreeQitem((SStreamQueueItem*)pBlock);
} else {*/
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
// update the child Id for downstream tasks
@ -204,8 +200,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
return code;
}
if (taskLevel == TASK_LEVEL__SINK) {
pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",

View File

@ -295,7 +295,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
}
#endif
static int32_t getNumOfItemsInputQ(const SStreamTask* pTask) {
int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue);
int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall);

View File

@ -244,6 +244,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
taosThreadMutexDestroy(&pTask->lock);
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);