fix(stream): fix error in generating checkpoint for state_window.

This commit is contained in:
Haojun Liao 2023-07-18 09:04:28 +08:00
parent 1b8ec19509
commit 2f24f776fb
6 changed files with 54 additions and 25 deletions

View File

@ -25,6 +25,7 @@
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
@ -267,6 +268,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
return terrno;
}
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setEpToDownstreamTask(pTask, pSinkTask);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -245,6 +245,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t status = 0;
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
@ -402,5 +404,6 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
}
}
qError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId);
return NULL;
}

View File

@ -135,7 +135,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = 1;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// inputQ, to make sure all blocks with less version have been handled by this task already.
@ -149,9 +149,10 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
streamFreeQitem((SStreamQueueItem*)pBlock);
}
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -176,16 +177,18 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
streamProcessCheckpointReadyMsg(pTask);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK) {
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
// todo: sink node needs alignment??
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
/* ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id);
} else {
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id);
streamFreeQitem((SStreamQueueItem*)pBlock);
} else {*/
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
// update the child Id for downstream tasks
@ -197,24 +200,33 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
if (notReady > 0) {
qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
id, pTask->info.selfChildId, notReady, num);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
qDebug(
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg downstream",
id, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointBlock(pBlock, pTask);
if (taskLevel == TASK_LEVEL__SINK) {
pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
id, num);
streamFreeQitem((SStreamQueueItem*)pBlock);
} else {
qDebug(
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
"downstream", id, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointBlock(pBlock, pTask);
}
}
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}

View File

@ -194,6 +194,11 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
}
}
static void freeItems(void* param) {
SSDataBlock* pBlock = param;
taosArrayDestroy(pBlock->pDataBlock);
}
void streamFreeQitem(SStreamQueueItem* data) {
int8_t type = data->type;
if (type == STREAM_INPUT__GET_RES) {
@ -227,5 +232,9 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
blockDataDestroy(pRefBlock->pBlock);
taosFreeQitem(pRefBlock);
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SStreamDataBlock* pBlock = (SStreamDataBlock*) data;
taosArrayDestroyEx(pBlock->blocks, freeItems);
taosFreeQitem(pBlock);
}
}

View File

@ -287,7 +287,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask;
}
}
@ -299,7 +299,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
if (ref > 0) {
qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) {
ASSERT(streamTaskShouldStop(&pTask->status));
tFreeStreamTask(pTask);

View File

@ -259,10 +259,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
return 0;
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return 1;
} else {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
int32_t type = pTask->outputType;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) {
return 1;
} else {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;