From 83dde6a212f243c4be416737ddd4efd536826866 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 12 Jun 2023 14:33:08 +0000 Subject: [PATCH] add trigger checkpoint --- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 156 ++++++++++++++++-- source/dnode/vnode/src/tq/tq.c | 4 +- source/libs/stream/src/streamBackendRocksdb.c | 14 +- 4 files changed, 153 insertions(+), 23 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index da4095d7ff..f78675f103 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -265,7 +265,7 @@ static void *mndThreadFp(void *param) { } if (sec % tsStreamCheckpointTickInterval == 0) { - mndStreamCheckpointTick(pMnode, sec); + // mndStreamCheckpointTick(pMnode, sec); } if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 75dedd49fe..5e3d4916a4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -85,6 +85,10 @@ int32_t mndInitStream(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } +static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask, + SMStreamDoCheckpointMsg *pMsg); + +static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId); void mndCleanupStream(SMnode *pMnode) {} @@ -795,37 +799,113 @@ _OVER: return code; } +static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) { + void *buf = NULL; + int32_t tlen = 0; + int32_t checkpointId = tGenIdPI64(); + + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + SArray *stream = taosArrayInit(64, sizeof(void *)); + + SListIter iter = {0}; + tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD); + SListNode *pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL) { + char streamName[TSDB_STREAM_FNAME_LEN] = {0}; + tdListNodeGetData(pStreamList, pNode, streamName); + SStreamObj *pStream = mndAcquireStream(pMnode, streamName); + taosArrayPush(stream, &pStream); + } + + if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId) < 0) { + mndReleaseVgroup(pMnode, pVgObj); + for (int i = 0; i < taosArrayGetSize(stream); i++) { + SStreamObj *p = taosArrayGetP(stream, i); + mndReleaseStream(pMnode, p); + } + taosArrayDestroy(stream); + return -1; + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; + } + mndReleaseVgroup(pMnode, pVgObj); + + for (int i = 0; i < taosArrayGetSize(stream); i++) { + SStreamObj *p = taosArrayGetP(stream, i); + mndReleaseStream(pMnode, p); + } + taosArrayDestroy(stream); + return 0; +} static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SStreamObj *pStream = NULL; + // listEleSize(); + // iterate all stream obj + SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - // incr tick - int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1); - // if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q - if (currentTick >= pStream->checkpointFreq) { - atomic_store_64(&pStream->currentTick, 0); - SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->streamId = pStream->uid; - pMsg->checkpointId = tGenIdPI64(); - memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN); - - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, - .pCont = pMsg, - .contLen = sizeof(SMStreamDoCheckpointMsg), - }; - - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + taosRLockLatch(&pStream->lock); + for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTask *pTask = taosArrayGetP(pLevel, 0); + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + int32_t sz = taosArrayGetSize(pLevel); + SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); + if (list == NULL) { + SList tlist; + tdListInit(&tlist, TSDB_STREAM_FNAME_LEN); + taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist)); + list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); + } + tdListAppend(list, (void *)pStream->name); + } } + taosRUnLockLatch(&pStream->lock); + + // if (pIter == NULL) break; + // // incr tick + // int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1); + // // if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q + // // if (currentTick >= pStream->checkpointFreq) { + // atomic_store_64(&pStream->currentTick, 0); + // SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + + // pMsg->streamId = pStream->uid; + // pMsg->checkpointId = tGenIdPI64(); + // memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN); + + // SRpcMsg rpcMsg = { + // .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, + // .pCont = pMsg, + // .contLen = sizeof(SMStreamDoCheckpointMsg), + // }; + + // tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } + void *vgIter = taosHashIterate(vgIds, NULL); + size_t klen = 0; + int64_t checkpointId = tGenIdPI64(); + while (vgIter) { + int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen); + SList *val = (SList *)vgIter; + + mndCreateCheckpoint(pMnode, *key, val); + vgIter = taosHashIterate(vgIds, vgIter); + } + taosHashCleanup(vgIds); + return 0; } @@ -871,6 +951,47 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con return 0; } +static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId) { + SStreamCheckpointSourceReq req = {0}; + req.checkpointId = checkpointId; + req.nodeId = nodeId; + req.expireTime = -1; + req.streamId = 0; // pTask->id.streamId; + req.taskId = 0; // pTask->id.taskId; + + int32_t code; + int32_t blen; + + tEncodeSize(tEncodeSStreamCheckpointSourceReq, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + + void *buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeSStreamCheckpointSourceReq(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)buf; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(nodeId); + + tEncoderClear(&encoder); + + *pBuf = buf; + *pLen = tlen; + + return 0; +} static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -918,6 +1039,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { void *buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask, pMsg) < 0) { + mndReleaseVgroup(pMnode, pVgObj); taosRUnLockLatch(&pStream->lock); mndReleaseStream(pMnode, pStream); mndTransDrop(pTrans); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eda1f0c8bd..5e06daeb14 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1513,7 +1513,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int int32_t len = msgLen - sizeof(SMsgHead); streamDoCheckpoint(pMeta); - taosWLockLatch(&pMeta->lock); - taosWUnLockLatch(&pMeta->lock); + // taosWLockLatch(&pMeta->lock); + // taosWUnLockLatch(&pMeta->lock); return 0; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7a50754e6b..82a2aff3f0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -195,6 +195,7 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { int64_t st = taosGetTimestampMs(); int32_t code = -1; SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid); + static int checkpointSuffix = 1; if (pHandle == NULL) { return -1; } @@ -204,13 +205,18 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { char* err = NULL; rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); if (cp == NULL || err != NULL) { - taosMemoryFree(err); qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); + taosMemoryFreeClear(err); + code = -1; + goto _ERROR; } - rocksdb_checkpoint_create(cp, path, 64 << 20, &err); + char buf[256] = {0}; + sprintf(buf, "%s/checkpoint_%d", path, checkpointSuffix); + + rocksdb_checkpoint_create(cp, buf, 64 << 20, &err); if (err != NULL) { qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); - taosMemoryFree(err); + taosMemoryFreeClear(err); } else { code = 0; qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, path, @@ -218,6 +224,8 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { } rocksdb_checkpoint_object_destroy(cp); } + checkpointSuffix += 1; +_ERROR: taosReleaseRef(streamBackendId, backendRid); return code; }