diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1fd2f2bc13..c41834bd82 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -321,8 +321,8 @@ typedef struct { int64_t init; int64_t step1Start; int64_t step2Start; - int64_t execStart; - int32_t taskUpdateCount; + int64_t start; + int32_t updateCount; int64_t latestUpdateTs; } STaskExecStatisInfo; @@ -722,7 +722,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); - +int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6430fee6a7..fc5300b8c7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1097,7 +1097,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); +// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id); streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1347,10 +1348,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL // commit the update taosWLockLatch(&pTq->pStreamMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", TD_VID(pTq->pVnode), pReq->taskId, numOfTasks); + if (streamMetaCommit(pTq->pStreamMeta) < 0) { // persist to disk } taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 106a4cc9b0..e0bae18545 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -274,7 +274,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - double el = (taosGetTimestampMs() - pTask->taskExecInfo.execStart) / 1000.0; + double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, el); @@ -755,8 +755,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t code = TSDB_CODE_SUCCESS; const char* id = pTask->id.idStr; - if (pTask->taskExecInfo.execStart == 0) { - pTask->taskExecInfo.execStart = taosGetTimestampMs(); + if (pTask->taskExecInfo.start == 0) { + pTask->taskExecInfo.start = taosGetTimestampMs(); } bool onlySubmitData = true; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3a34d941dd..3b3dca7f5f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -303,7 +303,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pTask->id.idStr, pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); +// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); // 2. save to disk taosWLockLatch(&pMeta->lock); @@ -365,7 +366,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); // 4. free it and remove fill-history task from disk meta-store - streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); +// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; @@ -408,6 +410,8 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); + } else { // drop fill-history task + streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id); } return code; @@ -503,16 +507,12 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } } else { // non-dispatch task, do task state transfer directly streamFreeQitem((SStreamQueueItem*)pBlock); - if (level != TASK_LEVEL__SINK) { - qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); - ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); - } - } else { - qDebug("s-task:%s sink task does not transfer state", id); + if (code != TSDB_CODE_SUCCESS) { + /*int8_t status = */ streamTaskSetSchedStatusInActive(pTask); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8dd93d45fc..6786f36c8e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -47,6 +47,8 @@ struct SMetaHbInfo { tmr_h hbTmr; int32_t stopFlag; int32_t tickCounter; + int32_t hbCount; + int64_t hbStart; }; SMetaRefMgt gMetaRefMgt; @@ -333,6 +335,7 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pUpdateTaskSet); + taosMemoryFree(pMeta->pHbInfo); taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); @@ -785,7 +788,6 @@ static bool enoughTimeDuration(SMetaHbInfo* pInfo) { void metaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; - SStreamHbMsg hbMsg = {0}; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { return; @@ -803,31 +805,37 @@ void metaHbToMnode(void* param, void* tmrId) { if (!pMeta->leader) { qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); taosReleaseRef(streamMetaId, rid); + pMeta->pHbInfo->hbStart = 0; return; } + // set the hb start time + if (pMeta->pHbInfo->hbStart == 0) { + pMeta->pHbInfo->hbStart = taosGetTimestampMs(); + } + if (!enoughTimeDuration(pMeta->pHbInfo)) { taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } - qInfo("vgId:%d start hb", pMeta->vgId); + qDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader); + SStreamHbMsg hbMsg = {0}; taosRLockLatch(&pMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SEpSet epset = {0}; bool hasValEpset = false; - hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); + int64_t keys[2] = {pId->streamId, pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, keys, sizeof(keys)); if ((*pTask)->info.fillHistory == 1) { continue; } @@ -879,10 +887,13 @@ void metaHbToMnode(void* param, void* tmrId) { initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); msg.info.noResp = 1; - qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); + pMeta->pHbInfo->hbCount += 1; + + qDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, + pMeta->pHbInfo->hbCount); tmsgSendReq(&epset, &msg); } else { - qError("vgId:%d no mnd epset", pMeta->vgId); + qDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } taosArrayDestroy(hbMsg.pTaskStatus); @@ -915,7 +926,9 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; - qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId); + qDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, + pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + taosWLockLatch(&pMeta->lock); void* pIter = NULL; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01318d89cd..23ace63d18 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -277,7 +277,20 @@ static void freeUpstreamItem(void* p) { void tFreeStreamTask(SStreamTask* pTask) { int32_t taskId = pTask->id.taskId; - qDebug("free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState); + STaskExecStatisInfo* pStatis = &pTask->taskExecInfo; + + qDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, + streamGetTaskStatusStr(pTask->status.taskStatus)); + + qDebug("s-task:0x%x exec info: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 + ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 + " nextProcessVer:%" PRId64, + taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, + pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer); + + if (pStatis->created == 0 || pStatis->init == 0 || pStatis->start == 0) { + int32_t k = 1; + } // remove the ref by timer while (pTask->status.timerActive > 0) { @@ -396,7 +409,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); if (code != 0) { - qError("s-task:%s init mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); + qError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); return code; } @@ -524,9 +537,8 @@ int32_t streamTaskStop(SStreamTask* pTask) { taosMsleep(100); } - pTask->taskExecInfo.init = 0; int64_t el = taosGetTimestampMs() - st; - qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms, and reset init ts", pMeta->vgId, pTask->id.idStr, el); + qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el); return 0; } @@ -556,9 +568,9 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { STaskExecStatisInfo* p = &pTask->taskExecInfo; qDebug("s-task:%s update task nodeEp epset, update count:%d, prevTs:%"PRId64, pTask->id.idStr, - p->taskUpdateCount + 1, p->latestUpdateTs); + p->updateCount + 1, p->latestUpdateTs); - p->taskUpdateCount += 1; + p->updateCount += 1; p->latestUpdateTs = taosGetTimestampMs(); for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { @@ -615,3 +627,25 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { return status; } + +int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) { + SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = vgId; + pReq->taskId = pTaskId->taskId; + pReq->streamId = pTaskId->streamId; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; + int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg); + if (code != TSDB_CODE_SUCCESS) { + qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); + return code; + } + + qDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId); + return code; +}