From 629502e12eee6365d4da272096727670c53d33e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 17:00:51 +0800 Subject: [PATCH 1/5] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 40 +++++++++---------- source/dnode/vnode/src/tqCommon/tqCommon.c | 15 ++----- source/libs/stream/inc/streamInt.h | 4 +- source/libs/stream/src/streamBackendRocksdb.c | 8 ++-- source/libs/stream/src/streamCheckpoint.c | 10 +++-- source/libs/stream/test/checkpointTest.cpp | 2 +- 6 files changed, 35 insertions(+), 44 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0e6b85bd2b..8dca3b2179 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -85,26 +85,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } } -int32_t tqInitialize(STQ* pTq) { - if (tqMetaOpen(pTq) < 0) { - return -1; - } - - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } - - int32_t vgId = TD_VID(pTq->pVnode); - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); - if (pTq->pStreamMeta == NULL) { - return -1; - } - - /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); - return 0; -} - void tqClose(STQ* pTq) { qDebug("start to close tq"); if (pTq == NULL) { @@ -137,6 +117,26 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } +int32_t tqInitialize(STQ* pTq) { + if (tqMetaOpen(pTq) < 0) { + return -1; + } + + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } + + int32_t vgId = TD_VID(pTq->pVnode); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); + if (pTq->pStreamMeta == NULL) { + return -1; + } + + /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + return 0; +} + void tqNotifyClose(STQ* pTq) { if (pTq == NULL) { return; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 924b0a8207..61ecbe8e5c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -27,6 +27,8 @@ typedef struct SMStreamCheckpointReadyRspMsg { SMsgHead head; } SMStreamCheckpointReadyRspMsg; +static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -490,16 +492,6 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } -static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) { - *startCheckTs = pTask->execInfo.checkTs; - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - *hasHTask = true; - pId->streamId = pTask->hTaskInfo.id.streamId; - pId->taskId = pTask->hTaskInfo.id.taskId; - } -} - int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); @@ -1053,10 +1045,9 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } -static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { +int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 0ee31197dc..3a4e3d81fb 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -157,9 +157,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); -int32_t streamTaskBackupCheckpoint(char* id, char* path); -int32_t downloadCheckpoint(char* id, char* path); -int32_t deleteCheckpoint(char* id); +int32_t streamTaskDownloadCheckpointData(char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 662d02a48f..da1096e7de 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -333,7 +333,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c taosRemoveDir(defaultPath); } - code = downloadCheckpoint(key, chkpPath); + code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } @@ -342,7 +342,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c return code; } int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - int32_t code = downloadCheckpoint(key, chkpPath); + int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } @@ -2110,8 +2110,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { - STaskDbWrapper* pDb = arg; - ECHECKPOINT_BACKUP_TYPE utype = type; + STaskDbWrapper* pDb = arg; + ECHECKPOINT_BACKUP_TYPE utype = type; if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3428fc36e1..5c7aaec623 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -27,7 +27,9 @@ typedef struct { } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); -static int32_t deleteCheckpointFile(char* id, char* name); +static int32_t deleteCheckpointFile(const char* id, const char* name); +static int32_t streamTaskBackupCheckpoint(char* id, char* path); +static int32_t deleteCheckpoint(char* id); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -644,9 +646,9 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch return 0; } -int32_t downloadCheckpoint(char* id, char* path) { +int32_t streamTaskDownloadCheckpointData(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("downloadCheckpoint parameters invalid"); + stError("streamTaskDownloadCheckpointData parameters invalid"); return -1; } @@ -672,7 +674,7 @@ int32_t deleteCheckpoint(char* id) { return 0; } -int32_t deleteCheckpointFile(char* id, char* name) { +int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); char* tmp = object; diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 0caad479e5..34e80fc08b 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -59,7 +59,7 @@ TEST(testCase, checkpointUpload_Test) { TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - // downloadCheckpoint(id, "/root/offset/download/"); + // streamTaskDownloadCheckpointData(id, "/root/offset/download/"); } TEST(testCase, checkpointDelete_Test) { From ed610d293f1c127e906e95c89fa6e40504dfd542 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 17:09:37 +0800 Subject: [PATCH 2/5] fix(stream): close task if it's in checkdown stream procedure. --- source/libs/stream/src/streamMeta.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index edc1a148a9..362c97df7c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1263,7 +1263,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->status.timerActive >= 1) { - stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId); + stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); + streamTaskStop(pTask); inTimer = true; } } From 732e52d64d816feb7fc91103e19df72645324a06 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 19:11:58 +0800 Subject: [PATCH 3/5] refactor: do some internal refactor, and add some logs. --- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 4 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 3 +- source/libs/stream/src/streamMeta.c | 43 +++++++++---------- 5 files changed, 26 insertions(+), 28 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e3487c49d1..14aae0b96a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -896,7 +896,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaResetStartInfo(STaskStartInfo* pMeta); SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); -int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); +void streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b717504e1e..87f0681780 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -93,9 +93,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } - if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) { - goto FAIL; - } + streamMetaLoadAllTasks(pSnode->pMeta); stopRsync(); startRsync(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index baef3d9bd6..3a3b103e3f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -92,7 +92,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + streamMetaLoadAllTasks(pTq->pStreamMeta); if (tqMetaTransform(pTq) < 0) { return -1; diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 926a8c62a7..290266d94a 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -179,5 +179,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { - return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); + streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 362c97df7c..aca0a38d48 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -117,31 +117,22 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { return 0; } -typedef struct { - int64_t chkpId; - char* path; - char* taskId; - - SArray* pChkpSave; - SArray* pChkpInUse; - int8_t chkpCap; - void* backend; - -} StreamMetaTaskState; - int32_t streamMetaOpenTdb(SStreamMeta* pMeta) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) { + stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path); return -1; - // goto _err; } if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { + stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId); return -1; } if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { + stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId); return -1; } + return 0; } @@ -231,17 +222,18 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { if (compatible == STREAM_STATA_COMPATIBLE) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { - stInfo("stream state need covert backend format"); + stInfo("vgId:%d stream state need covert backend format", pMeta->vgId); return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { stError( - "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " + "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "manually", - tsDataDir); + pMeta->vgId, tsDataDir); return -1; } + return 0; } @@ -324,33 +316,40 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (streamMetaMayCvtDbFormat(pMeta) < 0) { goto _err; } + if (streamMetaBegin(pMeta) < 0) { + stError("vgId:%d begin trans for stream meta failed", pMeta->vgId); goto _err; } _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasksMap == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->updateInfo.pTasks == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pReadyTaskSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pFailedTaskSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); if (pMeta->pHbInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -824,7 +823,8 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { return chkpId; } -int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { +// not allowed to return error code +void streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; void* pKey = NULL; int32_t kLen = 0; @@ -833,7 +833,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { SDecoder decoder; if (pMeta == NULL) { - return TSDB_CODE_SUCCESS; + return; } SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); @@ -844,7 +844,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - return TSDB_CODE_SUCCESS; + return; } tdbTbcMoveToFirst(pCur); @@ -937,7 +937,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); - return TSDB_CODE_SUCCESS; } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { @@ -1644,7 +1643,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; - qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", + stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", pMeta->vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; @@ -1652,7 +1651,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { // task does not exists in current vnode, not record the complete info - qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); return 0; } From 05a204dd6c51717c78a0640816bf66528c42b93f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 19:30:56 +0800 Subject: [PATCH 4/5] refactor: do some internal refactor. --- source/libs/stream/src/streamMeta.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aca0a38d48..a091a866a0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -314,6 +314,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } if (streamMetaMayCvtDbFormat(pMeta) < 0) { + stError("vgId:%d convert sub info format failed, open stream meta failed", pMeta->vgId); goto _err; } @@ -372,8 +373,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; - stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, - stage); + + stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); pMeta->rid = taosAddRef(streamMetaId, pMeta); From cdc7b03ac697e13622a2ede04b89646543576c5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 22:44:32 +0800 Subject: [PATCH 5/5] fix(stream): fix syntax error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 61ecbe8e5c..e404f1e7b9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -764,13 +764,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); - code = streamMetaLoadAllTasks(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } + streamMetaLoadAllTasks(pMeta); { STaskStartInfo* pStartInfo = &pMeta->startInfo;