refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-11-08 15:20:47 +08:00
parent 17bd6badec
commit 99dbbe61d7
5 changed files with 44 additions and 78 deletions

View File

@ -294,6 +294,9 @@ typedef struct SCheckpointInfo {
int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id
int64_t checkpointingId;
int32_t downstreamAlignNum;
int32_t checkpointNotReadyTasks;
int64_t msgVer;
} SCheckpointInfo;
@ -427,9 +430,6 @@ struct SStreamTask {
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
int32_t checkpointNotReadyTasks;
int32_t transferStateAlignCnt;
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
@ -477,7 +477,6 @@ typedef struct SStreamMeta {
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int32_t chkptNotReadyTasks;
int64_t rid;
int64_t chkpId;

View File

@ -1705,7 +1705,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// Downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
pTask->checkpointingId = req.checkpointId;
pTask->chkInfo.checkpointingId = req.checkpointId;
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
", set it failure",
@ -1744,10 +1744,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
total = pMeta->numOfStreamTasks;
streamMetaWUnLock(pMeta);

View File

@ -112,7 +112,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId);
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);

View File

@ -94,12 +94,12 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
if (old == 0) {
stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
}
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
}
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
@ -117,7 +117,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
}
pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.version = pTask->checkpointingId;
pBlock->info.version = pTask->chkInfo.checkpointingId;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
@ -140,8 +140,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.checkpointingId = pReq->checkpointId;
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
@ -173,7 +173,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// set task status
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
pTask->checkpointingId = checkpointId;
pTask->chkInfo.checkpointingId = checkpointId;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
@ -181,17 +181,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
}
}
{ // todo: remove this when the pipeline checkpoint generating is used.
SStreamMeta* pMeta = pTask->pMeta;
streamMetaWLock(pMeta);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
}
streamMetaWUnLock(pMeta);
}
// todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
@ -200,7 +189,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
streamProcessCheckpointReadyMsg(pTask);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
@ -235,7 +224,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
@ -254,7 +243,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1);
int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1);
ASSERT(notReady >= 0);
if (notReady == 0) {
@ -270,35 +259,27 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
}
void streamTaskClearCheckInfo(SStreamTask* pTask) {
pTask->checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->checkpointNotReadyTasks = 0;
pTask->checkpointAlignCnt = 0;
pTask->chkInfo.downstreamAlignNum = 0;
pTask->chkInfo.checkpointNotReadyTasks = 0;
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
}
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t checkpointId) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
SStreamMeta* pMeta = p->pMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
streamMetaWLock(pMeta);
// for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
// STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
// SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
// if (ppTask == NULL) {
// continue;
// }
// SStreamTask* p = *ppTask;
if (p->info.fillHistory == 1) {
// continue;
return code;
}
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
streamMetaWLock(pMeta);
ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId);
p->chkInfo.checkpointId = p->checkpointingId;
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
streamTaskClearCheckInfo(p);
char* str = NULL;
@ -316,8 +297,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t chec
stDebug(
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
str);
vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str);
code = streamMetaCommit(pMeta);
if (code < 0) {
@ -332,28 +312,21 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, SStreamTask* p, int64_t chec
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
// check for all tasks, and do generate the vnode-wide checkpoint data.
SStreamMeta* pMeta = pTask->pMeta;
// int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
// ASSERT(remain >= 0);
int64_t checkpointStartTs = pTask->chkInfo.startTs;
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
// if (remain == 0) { // all tasks are ready
stDebug("s-task:%s all downstreams are ready, ready for do checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask, pTask->checkpointingId);
stInfo(
"vgId:%d vnode wide checkpoint completed, save all tasks status, last:%s, level:%d elapsed time:%.2f Sec "
"checkpointId:%" PRId64,
pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, pTask->checkpointingId);
// } else {
// stInfo(
// "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, level:%d elapsed time:%.2f Sec "
// "not ready:%d/%d",
// pMeta->vgId, pTask->id.idStr, pTask->info.taskLevel, el, remain, pMeta->numOfStreamTasks);
// }
// sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel);
streamBackendDoCheckpoint(pTask->pBackend, pTask->chkInfo.checkpointingId);
streamSaveTaskCheckpointInfo(pTask, pTask->chkInfo.checkpointingId);
}
double el = (taosGetTimestampMs() - checkpointStartTs) / 1000.0;
stInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " save all tasks status, level:%d elapsed time:%.2f Sec ",
pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.checkpointingId, pTask->info.taskLevel, el);
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -364,17 +337,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) {
// record the failure checkpoint id
pTask->chkInfo.failedId = pTask->checkpointingId;
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
pTask->checkpointingId, tstrerror(code));
pTask->chkInfo.checkpointingId, tstrerror(code));
}
return code;
}
//static int64_t kBlockSize = 64 * 1024;
//static int sendCheckpointToS3(char* id, SArray* fileList){
// code = s3PutObjectFromFile2(from->fname, object_name);

View File

@ -230,7 +230,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char *key) {
SStreamTask* pTask = arg;
int64_t chkpId = pTask->checkpointingId;
int64_t chkpId = pTask->chkInfo.checkpointingId;
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
@ -442,7 +442,6 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosArrayClear(pMeta->chkpInUse);
pMeta->numOfStreamTasks = 0;
pMeta->numOfPausedTasks = 0;
pMeta->chkptNotReadyTasks = 0;
streamMetaResetStartInfo(&pMeta->startInfo);
}
@ -1078,9 +1077,9 @@ void metaHbToMnode(void* param, void* tmrId) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->checkpointingId);
entry.activeCheckpointId = (*pTask)->checkpointingId;
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
}
if ((*pTask)->exec.pWalReader != NULL) {