Merge branch 'fix/newCheckpoint' into enh/triggerCheckPoint2

This commit is contained in:
yihaoDeng 2023-07-15 04:27:39 +00:00
commit 7b4185dc56
3 changed files with 84 additions and 78 deletions

View File

@ -143,7 +143,26 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0; return 0;
} }
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (*pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
int8_t status = 0; int8_t status = 0;
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
@ -158,23 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
} }
// rsp by input status return status;
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return (status == TASK_INPUT_STATUS__NORMAL) ? 0 : -1;
} }
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
@ -239,22 +242,36 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
int32_t status = 0;
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
if (!pInfo->dataAllowed) {
qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
} else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
}
status = streamTaskAppendInputBlocks(pTask, pReq);
}
{
// do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
// todo handle failure
return code;
}
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
} }
streamTaskAppendInputBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
if (exec) {
if (streamTryExec(pTask) < 0) {
return -1;
}
} else {
streamSchedExec(pTask); streamSchedExec(pTask);
}
return 0; return 0;
} }
@ -262,10 +279,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
return -1; return -1;
} }
/*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
/*streamDispatchStreamBlock(pTask);*/
/*}*/
return 0; return 0;
} }
@ -380,7 +393,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
} }
} }
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);

View File

@ -136,15 +136,31 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->checkpointingId = pReq->checkpointId; pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = 1; pTask->checkpointNotReadyTasks = 1;
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already. // inputQ, to make sure all blocks with less version have been handled by this task already.
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
} }
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
}
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
int64_t checkpointId = pDataBlock->info.version; int64_t checkpointId = pDataBlock->info.version;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
// set the task status // set the task status
pTask->checkpointingId = checkpointId; pTask->checkpointingId = checkpointId;
pTask->status.taskStatus = TASK_STATUS__CK; pTask->status.taskStatus = TASK_STATUS__CK;
@ -153,20 +169,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t taskLevel = pTask->info.taskLevel; int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) { if (taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
pBlock->srcTaskId = pTask->id.taskId; qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId);
pBlock->srcVgId = pTask->pMeta->vgId; continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", pTask->id.idStr,
pTask->info.selfChildId);
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) { // todo failed to add it into the output queue, free it.
return code;
}
streamFreeQitem((SStreamQueueItem*)pBlock);
streamDispatchStreamBlock(pTask);
} else { // only one task exists
streamProcessCheckpointReadyMsg(pTask); streamProcessCheckpointReadyMsg(pTask);
} }
} else if (taskLevel == TASK_LEVEL__SINK) { } else if (taskLevel == TASK_LEVEL__SINK) {
@ -176,7 +181,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// update the child Id for downstream tasks // update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", pTask->id.idStr); qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id);
} else { } else {
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
@ -188,14 +193,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
if (notReady > 0) { if (notReady > 0) {
qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
pTask->id.idStr, pTask->info.selfChildId, notReady, num); id, pTask->info.selfChildId, notReady, num);
return 0; return code;
} }
qDebug( qDebug(
"s-task:%s receive one checkpoint block, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to " "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg downstream",
"downstream", id, num);
pTask->id.idStr, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure // can start local checkpoint procedure
@ -204,23 +208,10 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks // already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointBlock(pBlock, pTask);
}
{
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) { // todo failed to add it into the output queue, free it.
return code; return code;
}
streamFreeQitem((SStreamQueueItem*)pBlock);
streamDispatchStreamBlock(pTask);
}
}
return 0;
} }
/** /**
@ -259,10 +250,12 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
int8_t prev = p->status.taskStatus; int8_t prev = p->status.taskStatus;
p->status.taskStatus = TASK_STATUS__NORMAL; p->status.taskStatus = TASK_STATUS__NORMAL;
// save the task
streamMetaSaveTask(pMeta, p); streamMetaSaveTask(pMeta, p);
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
" currentVer:%" PRId64 ", status to be normal, prev:%s", qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
streamGetTaskStatusStr(prev)); streamGetTaskStatusStr(prev));
} }

View File

@ -386,7 +386,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT((*pVer) < pSubmit->submit.ver); ASSERT((*pVer) <= pSubmit->submit.ver);
(*pVer) = pSubmit->submit.ver; (*pVer) = pSubmit->submit.ver;
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
@ -405,7 +405,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks, qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
pMerged->ver); pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT((*pVer) < pMerged->ver); ASSERT((*pVer) <= pMerged->ver);
(*pVer) = pMerged->ver; (*pVer) = pMerged->ver;
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
@ -485,7 +485,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer); ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer);
if (ver != pTask->chkInfo.checkpointVer) { if (ver != pTask->chkInfo.checkpointVer) {
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
pTask->chkInfo.checkpointVer); pTask->chkInfo.checkpointVer);
} }