fix(stream): transfer state before do checkpoint, to avoid the retrieve and deadlock by using waiting .

This commit is contained in:
Haojun Liao 2024-03-14 16:13:41 +08:00
parent 190b02dd1a
commit cc0b32b1f8
6 changed files with 70 additions and 37 deletions

View File

@ -1205,8 +1205,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr,
if (req.mndTrigger) {
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr,
vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
} else {
const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
", transId:%d after transfer-state, prev status:%s",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
}
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -130,7 +130,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTransferStatePrepare(SStreamTask* pTask);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);

View File

@ -300,6 +300,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
taosThreadMutexLock(&p->lock);
SStreamTaskState* pStatus = streamTaskGetStatus(p);
ETaskStatus prevStatus = pStatus->state;
if (pStatus->state == TASK_STATUS__CK) {
ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId &&
pCKInfo->checkpointVer <= pCKInfo->processedVer);
@ -325,8 +327,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
}
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name);
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer,
streamTaskGetStatusStr(prevStatus));
// save the task if not sink task
if (p->info.taskLevel <= TASK_LEVEL__SINK) {
@ -437,9 +440,11 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
if (type == UPLOAD_DISABLE) {
return 0;
}
if (pTask == NULL || pTask->pBackend == NULL) {
return 0;
}
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type;
arg->taskId = taosStrdup(taskId);
@ -448,16 +453,19 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL);
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
SStreamMeta* pMeta = pTask->pMeta;
// sink task do not need to save the status, and generated the checkpoint
// sink task does not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
@ -500,10 +508,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1);
} else {
stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId);
}
taosThreadMutexUnlock(&pTask->lock);
}

View File

@ -1086,10 +1086,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
code = streamTransferStatePrepare(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}

View File

@ -21,7 +21,7 @@
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
@ -316,7 +316,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
}
}
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
@ -340,9 +340,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
stDebug(
"s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
"exec state",
id, el, pStreamTask->id.idStr);
"s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
"info, prepare transfer exec state",
id, streamTaskGetStatus(pTask)->name, el, pStreamTask->id.idStr);
}
ETaskStatus status = streamTaskGetStatus(pStreamTask)->state;
@ -365,9 +365,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
}
}
// wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
@ -393,17 +390,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
}
// 2. transfer the ownership of executor state
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
// NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
// 2. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask);
// 4. assign the status to the value that will be kept in disk
// 3. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 5. open the inputQ for all upstream tasks
// 4. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
@ -416,7 +410,7 @@ static int32_t haltCallback(SStreamTask* pTask, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamMeta* pMeta = pTask->pMeta;
@ -424,7 +418,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
code = streamTransferStateDoPrepare(pTask);
} else {
// no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
@ -540,7 +534,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
code = streamTransferStatePrepare(pTask);
if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
}
@ -621,10 +615,31 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
}
}
int64_t st = taosGetTimestampMs();
if (type == STREAM_INPUT__CHECKPOINT) {
// transfer the state from fill-history to related stream task before generating the checkpoint.
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
const SStreamQueueItem* pItem = pInput;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
}
}
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type));
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);

View File

@ -791,8 +791,10 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
if (resetRelHalt) {
stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus),
streamTaskGetStatus(*ppStreamTask)->name);
(*ppStreamTask)->status.taskStatus = TASK_STATUS__READY;
stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId);
}
streamMetaSaveTask(pMeta, *ppStreamTask);