fix(stream): remove fill-history sink task.

This commit is contained in:
Haojun Liao 2023-09-15 17:46:17 +08:00
parent e84eeee6b8
commit 98ef566eb4
6 changed files with 84 additions and 32 deletions

View File

@ -321,8 +321,8 @@ typedef struct {
int64_t init; int64_t init;
int64_t step1Start; int64_t step1Start;
int64_t step2Start; int64_t step2Start;
int64_t execStart; int64_t start;
int32_t taskUpdateCount; int32_t updateCount;
int64_t latestUpdateTs; int64_t latestUpdateTs;
} STaskExecStatisInfo; } STaskExecStatisInfo;
@ -722,7 +722,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed); int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,

View File

@ -1097,7 +1097,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); // streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
} }
@ -1347,10 +1348,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
// commit the update // commit the update
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks);
if (streamMetaCommit(pTq->pStreamMeta) < 0) { if (streamMetaCommit(pTq->pStreamMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0; return 0;
} }

View File

@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder; SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
double el = (taosGetTimestampMs() - pTask->taskExecInfo.execStart) / 1000.0; double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, duration:%.2f Sec.", " submit into dst table, duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el);
@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
if (pTask->taskExecInfo.execStart == 0) { if (pTask->taskExecInfo.start == 0) {
pTask->taskExecInfo.execStart = taosGetTimestampMs(); pTask->taskExecInfo.start = taosGetTimestampMs();
} }
bool onlySubmitData = true; bool onlySubmitData = true;

View File

@ -303,7 +303,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTask->id.idStr, pTask->streamTaskId.taskId); pTask->id.idStr, pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store // 1. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); // streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
// 2. save to disk // 2. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -365,7 +366,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// 4. free it and remove fill-history task from disk meta-store // 4. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); // streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info // 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->historyTaskId.taskId = 0;
@ -408,6 +410,8 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task
streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id);
} }
return code; return code;
@ -503,16 +507,12 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
} }
} else { // non-dispatch task, do task state transfer directly } else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
if (level != TASK_LEVEL__SINK) { qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); ASSERT(pTask->info.fillHistory == 1);
ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask); /*int8_t status = */ streamTaskSetSchedStatusInActive(pTask);
}
} else {
qDebug("s-task:%s sink task does not transfer state", id);
} }
} }

View File

@ -47,6 +47,8 @@ struct SMetaHbInfo {
tmr_h hbTmr; tmr_h hbTmr;
int32_t stopFlag; int32_t stopFlag;
int32_t tickCounter; int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
}; };
SMetaRefMgt gMetaRefMgt; SMetaRefMgt gMetaRefMgt;
@ -333,6 +335,7 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->pUpdateTaskSet);
taosMemoryFree(pMeta->pHbInfo);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex); taosThreadMutexDestroy(&pMeta->backendMutex);
@ -785,7 +788,6 @@ static bool enoughTimeDuration(SMetaHbInfo* pInfo) {
void metaHbToMnode(void* param, void* tmrId) { void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param; int64_t rid = *(int64_t*)param;
SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
@ -803,31 +805,37 @@ void metaHbToMnode(void* param, void* tmrId) {
if (!pMeta->leader) { if (!pMeta->leader) {
qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return; return;
} }
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!enoughTimeDuration(pMeta->pHbInfo)) { if (!enoughTimeDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
qInfo("vgId:%d start hb", pMeta->vgId); qDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader);
SStreamHbMsg hbMsg = {0};
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SEpSet epset = {0}; SEpSet epset = {0};
bool hasValEpset = false; bool hasValEpset = false;
hbMsg.vgId = pMeta->vgId; hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys));
if ((*pTask)->info.fillHistory == 1) { if ((*pTask)->info.fillHistory == 1) {
continue; continue;
} }
@ -879,10 +887,13 @@ void metaHbToMnode(void* param, void* tmrId) {
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1; msg.info.noResp = 1;
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); pMeta->pHbInfo->hbCount += 1;
qDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
} else { } else {
qError("vgId:%d no mnd epset", pMeta->vgId); qDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
} }
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);
@ -915,7 +926,9 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId); qDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
void* pIter = NULL; void* pIter = NULL;

View File

@ -277,7 +277,20 @@ static void freeUpstreamItem(void* p) {
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState); STaskExecStatisInfo* pStatis = &pTask->taskExecInfo;
qDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState,
streamGetTaskStatusStr(pTask->status.taskStatus));
qDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
" nextProcessVer:%" PRId64,
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer);
if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) {
int32_t k = 1;
}
// remove the ref by timer // remove the ref by timer
while (pTask->status.timerActive > 0) { while (pTask->status.timerActive > 0) {
@ -396,7 +409,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr); int code = taosThreadMutexAttrInit(&attr);
if (code != 0) { if (code != 0) {
qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); qError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
return code; return code;
} }
@ -524,9 +537,8 @@ int32_t streamTaskStop(SStreamTask* pTask) {
taosMsleep(100); taosMsleep(100);
} }
pTask->taskExecInfo.init = 0;
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el); qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el);
return 0; return 0;
} }
@ -556,9 +568,9 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
STaskExecStatisInfo* p = &pTask->taskExecInfo; STaskExecStatisInfo* p = &pTask->taskExecInfo;
qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr, qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr,
p->taskUpdateCount + 1, p->latestUpdateTs); p->updateCount + 1, p->latestUpdateTs);
p->taskUpdateCount += 1; p->updateCount += 1;
p->latestUpdateTs = taosGetTimestampMs(); p->latestUpdateTs = taosGetTimestampMs();
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
@ -615,3 +627,25 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
return status; return status;
} }
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code;
}
qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
return code;
}