Merge pull request #27688 from taosdata/fix/3_liaohj

fix(stream): handle continuous retrieve during checkpoint procedure.
This commit is contained in:
Haojun Liao 2024-09-06 09:11:08 +08:00 committed by GitHub
commit 1e57724533
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 115 additions and 92 deletions

View File

@ -705,7 +705,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pInfo, int32_t code);
@ -810,6 +810,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes);
void streamTaskSetCheckpointFailed(SStreamTask* pTask);
// stream task state machine, and event handling
int32_t streamCreateStateMachine(SStreamTask* pTask);

View File

@ -251,7 +251,7 @@ void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
int32_t code = mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
if (code) {
mError("failed to kill trans:%d", pTrans->id);
mError("failed to kill transId:%d, code:%s", pTrans->id, tstrerror(code));
}
} else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);

View File

@ -1129,7 +1129,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL) {
if (pTask == NULL || code != 0) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
" transId:%d it may have been destroyed",
vgId, req.taskId, req.checkpointId, req.transId);

View File

@ -410,7 +410,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
if (code) {
tqError("vgId:%d failed to decode retrieve msg, quit handling it", pMeta->vgId);
tqError("vgId:%d failed to decode retrieve msg, discard it", pMeta->vgId);
return code;
}
@ -420,9 +420,16 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.dstTaskId);
tCleanupStreamRetrieveReq(&req);
return -1;
return code;
}
// enqueue
tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
// if task is in ck status, set current ck failed
streamTaskSetCheckpointFailed(pTask);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamProcessRetrieveReq(pTask, &req);
} else {
@ -431,14 +438,19 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
code = streamTaskBroadcastRetrieveReq(pTask, &req);
}
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamTaskSendRetrieveRsp(&req, &rsp);
if (code != TSDB_CODE_SUCCESS) { // return error not send rsp manually
tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
req.srcTaskId, tstrerror(code));
} else { // send rsp manually only on success.
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamTaskSendRetrieveRsp(&req, &rsp);
}
streamMetaReleaseTask(pMeta, pTask);
tCleanupStreamRetrieveReq(&req);
// always return success, to disable the auto rsp
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {

View File

@ -601,7 +601,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
(void)taosThreadOnce(&initPoolOnce, initRefPool);
qDebug("start to create task, TID:0x%" PRIx64 "QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {

View File

@ -288,7 +288,7 @@ void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst) {
memcpy(p, "TID:0x", offset);
offset += tintToHex(taskId, &p[offset]);
memcpy(&p[offset], "QID:0x", 7);
memcpy(&p[offset], " QID:0x", 7);
offset += 7;
offset += tintToHex(queryId, &p[offset]);

View File

@ -65,12 +65,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
", prev:%" PRId64,
id, upstreamTaskId, vgId, stage, pInfo->stage);
// record the checkpoint failure id and sent to mnode
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
streamMutexUnlock(&pTask->lock);
streamTaskSetCheckpointFailed(pTask);
}
if (pInfo->stage != stage) {

View File

@ -673,6 +673,15 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
}
}
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
streamMutexUnlock(&pTask->lock);
}
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
int32_t code = 0;
int32_t cap = strlen(path) + 64;
@ -1111,26 +1120,20 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
// record the dispatch checkpoint trigger info in the list
// memory insufficient may cause the stream computing stopped
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
int64_t now = taosGetTimestampMs();
streamMutexLock(&pInfo->lock);
// outputQ should be empty here
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
stFatal("s-task:%s items are still in outputQ, failed to init trigger dispatch info", pTask->id.idStr);
return;
}
pInfo->dispatchTrigger = true;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) {
// pause the stream task, if memory not enough
if (px == NULL) { // pause the stream task, if memory not enough
streamMutexUnlock(&pInfo->lock);
return terrno;
}
} else {
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
@ -1141,13 +1144,15 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
if (px == NULL) {
// pause the stream task, if memory not enough
if (px == NULL) { // pause the stream task, if memory not enough
streamMutexUnlock(&pInfo->lock);
return terrno;
}
}
}
streamMutexUnlock(&pInfo->lock);
return 0;
}
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {

View File

@ -726,8 +726,11 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
const char* id = pTask->id.idStr;
int32_t code = 0;
SStreamDataBlock* pBlock = NULL;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
int32_t numOfUnAccessed = streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue);
@ -755,35 +758,49 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
}
SStreamDataBlock* pBlock = NULL;
streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock);
if (pBlock == NULL) {
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status);
return 0;
}
while (1) {
streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock);
if (pBlock == NULL) {
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status);
return 0;
}
int32_t type = pBlock->type;
if (!(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE)) {
stError("s-task:%s invalid dispatch block type:%d", id, type);
return TSDB_CODE_INTERNAL_ERROR;
}
int32_t type = pBlock->type;
if (!(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE)) {
stError("s-task:%s invalid dispatch block type:%d", id, type);
return TSDB_CODE_INTERNAL_ERROR;
}
pTask->execInfo.dispatch += 1;
pTask->execInfo.dispatch += 1;
streamMutexLock(&pTask->msgInfo.lock);
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
streamMutexUnlock(&pTask->msgInfo.lock);
streamMutexLock(&pTask->msgInfo.lock);
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
streamMutexUnlock(&pTask->msgInfo.lock);
int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) {
destroyStreamDataBlock(pBlock);
} else { // todo handle build dispatch msg failed
}
code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) {
destroyStreamDataBlock(pBlock);
} else { // todo handle build dispatch msg failed
}
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskInitTriggerDispatchInfo(pTask);
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// outputQ should be empty here, otherwise, set the checkpoint failed due to the retrieve req happens
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
stError("s-task:%s items are still in outputQ due to downstream retrieve, failed to init trigger dispatch",
pTask->id.idStr);
streamTaskSetCheckpointFailed(pTask);
clearBufferedDispatchMsg(pTask);
continue;
}
code = streamTaskInitTriggerDispatchInfo(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
}
}
break;
}
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);

View File

@ -98,14 +98,13 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
*totalBlocks = 0;
*totalSize = 0;
int32_t size = 0;
int32_t numOfBlocks = 0;
SArray* pRes = NULL;
*totalBlocks = 0;
*totalSize = 0;
while (1) {
if (pRes == NULL) {
pRes = taosArrayInit(4, sizeof(SSDataBlock));
@ -131,7 +130,8 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
continue;
@ -596,12 +596,32 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
// static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, int64_t totalSize, int64_t blockSize,
double st, const char* id) {
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%" PRId64, id,
el, SIZE_IN_MiB(totalSize), totalBlocks);
pInfo->outputDataBlocks += totalBlocks;
pInfo->outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pInfo->procsThroughput = 0;
pInfo->outputThroughput = 0;
} else {
pInfo->outputThroughput = (totalSize / el);
pInfo->procsThroughput = (blockSize / el);
}
}
static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) {
const char* id = pTask->id.idStr;
int32_t blockSize = 0;
int64_t st = taosGetTimestampMs();
SCheckpointInfo* pInfo = &pTask->chkInfo;
int64_t ver = pInfo->processedVer;
int64_t totalSize = 0;
int32_t totalBlocks = 0;
stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type));
@ -611,23 +631,8 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
return;
}
int64_t totalSize = 0;
int32_t totalBlocks = 0;
streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(totalSize), totalBlocks);
pTask->execInfo.outputDataBlocks += totalBlocks;
pTask->execInfo.outputDataSize += totalSize;
if (fabs(el - 0.0) <= DBL_EPSILON) {
pTask->execInfo.procsThroughput = 0;
pTask->execInfo.outputThroughput = 0;
} else {
pTask->execInfo.outputThroughput = (totalSize / el);
pTask->execInfo.procsThroughput = (blockSize / el);
}
doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr);
// update the currentVer if processing the submit blocks.
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {

View File

@ -1254,16 +1254,7 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
continue;
}
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState.name);
}
streamMutexUnlock(&pTask->lock);
streamTaskSetCheckpointFailed(pTask);
streamMetaReleaseTask(pMeta, pTask);
}

View File

@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t msgLen = px->submit.msgLen;
@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamFreeQitem(pItem);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = taosWriteQitem(pQueue, pItem);

View File

@ -1098,15 +1098,12 @@ static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq*
return terrno = code;
}
// enqueue
stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0;
code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to convert retrieve-data to block, code:%s", pTask->id.idStr, tstrerror(code));
taosFreeQitem(pData);
return code;
}