diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 5961821445..793019f8ad 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -745,9 +745,9 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 53afcc0e3f..1cb4332737 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -133,6 +133,7 @@ void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execNodeList.pTaskList); taosHashCleanup(execNodeList.pTaskMap); taosThreadMutexDestroy(&execNodeList.lock); + mDebug("mnd stream cleanup"); } SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { @@ -2279,12 +2280,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } - for (int32_t i = 0; i < req.numOfTasks; ++i) { - STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + for(int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status)); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bade1d6a93..f4e6c8c919 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1890,6 +1890,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { taosWLockLatch(&pMeta->lock); streamSetStatusNormal(pTask); streamMetaSaveTask(pMeta, pTask); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } taosWUnLockLatch(&pMeta->lock); } streamTaskStop(pTask); @@ -1905,25 +1908,27 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // all tasks are closed, now let's restart the stream meta if (pMeta->closedTask == numOfCount) { tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId); - if (streamMetaCommit(pMeta) < 0) { +// if (streamMetaCommit(pMeta) < 0) { // persist to disk - } +// } restartTasks = true; pMeta->closedTask = 0; // reset value + } else { + tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfCount - pMeta->closedTask)); } taosWUnLockLatch(&pMeta->lock); _end: tDecoderClear(&decoder); - tmsgSendRsp(&rsp); +// tmsgSendRsp(&rsp); if (restartTasks) { tqDebug("vgId:%d all tasks are stopped, restart them", vgId); taosWLockLatch(&pMeta->lock); terrno = 0; - int32_t code = streamMetaReopen(pTq->pStreamMeta, 0); + int32_t code = streamMetaReopen(pMeta, 0); if (code != 0) { tqError("vgId:%d failed to reopen stream meta", vgId); taosWUnLockLatch(&pMeta->lock); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2e175de0b8..99197c56ef 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -523,6 +523,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_DROP_INDEX: vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); break; + case TDMT_VND_STREAM_CHECK_POINT_SOURCE: + tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); + break; + case TDMT_VND_STREAM_TASK_UPDATE: + tqProcessTaskUpdateReq(pVnode->pTq, pMsg); + break; case TDMT_VND_COMPACT: vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp); goto _exit; @@ -678,10 +684,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_CHECK_POINT_SOURCE: - return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_TASK_UPDATE: - return tqProcessTaskUpdateReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 94c859c669..90619cee44 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -790,14 +790,13 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (wrapper->pHandle[i]) { rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; - size_t len = 0; - char* name = rocksdb_column_family_handle_get_name(p, &len); - // char buf[64] = {0}; - // memcpy(buf, name, len); - // qError("column name: name: %s, len: %d", buf, (int)len); - // taosMemoryFree(name); - taosArrayPush(pHandle, &p); + // size_t len = 0; + // char* name = rocksdb_column_family_handle_get_name(p, &len); + // char buf[64] = {0}; + // memcpy(buf, name, len); + // qError("column name: name: %s, len: %d", buf, (int)len); + // taosMemoryFree(name); } } taosThreadRwlockUnlock(&wrapper->rwLock); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2e4ddf703c..355e0103d6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -815,6 +815,8 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, SStreamChkptReadyInfo info = {.taskId = pInfo->taskId, .epset = pInfo->epSet}; initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); + info.msg.info.noResp = 1; // refactor later. + qDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d", pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.downstreamNodeId, index); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 1284ce31b1..e4c133ac1e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -56,6 +56,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { goto _err; } + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } @@ -64,6 +65,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + if (streamMetaBegin(pMeta) < 0) { + goto _err; + } + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasks == NULL) { @@ -77,10 +82,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - if (streamMetaBegin(pMeta) < 0) { - goto _err; - } - pMeta->walScanCounter = 0; pMeta->vgId = vgId; pMeta->ahandle = ahandle; @@ -618,22 +619,27 @@ void metaHbToMnode(void* param, void* tmrId) { SEpSet epset = {0}; - hbMsg.numOfTasks = numOfTasks; hbMsg.vgId = pMeta->vgId; hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); + for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + int64_t keys[2] = {pId->streamId, pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + + if ((*pTask)->info.fillHistory == 1) { + continue; + } - int64_t keys[2] = {pId->streamId, pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; - taosArrayPush(hbMsg.pTaskStatus, &entry); + if (i == 0) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); } } + hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); taosRUnLockLatch(&pMeta->lock); int32_t code = 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 50cf74bcf0..3a4050d005 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -348,6 +348,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->status.timerActive = 0; pTask->inputQueue = streamQueueOpen(512 << 10); pTask->outputInfo.queue = streamQueueOpen(512 << 10); @@ -533,7 +534,7 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { if (pTask->info.nodeId == nodeId) { // execution task should be moved away epsetAssign(&pTask->info.epSet, pEpSet); EPSET_TO_STR(pEpSet, buf) - qDebug("s-task:0x%x (vgId:%d) epset is updated %s", pTask->id.taskId, nodeId, buf); + qDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); } // check for the dispath info and the upstream task info