diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0e6b85bd2b..8dca3b2179 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -85,26 +85,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } } -int32_t tqInitialize(STQ* pTq) { - if (tqMetaOpen(pTq) < 0) { - return -1; - } - - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } - - int32_t vgId = TD_VID(pTq->pVnode); - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); - if (pTq->pStreamMeta == NULL) { - return -1; - } - - /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); - return 0; -} - void tqClose(STQ* pTq) { qDebug("start to close tq"); if (pTq == NULL) { @@ -137,6 +117,26 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } +int32_t tqInitialize(STQ* pTq) { + if (tqMetaOpen(pTq) < 0) { + return -1; + } + + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } + + int32_t vgId = TD_VID(pTq->pVnode); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); + if (pTq->pStreamMeta == NULL) { + return -1; + } + + /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + return 0; +} + void tqNotifyClose(STQ* pTq) { if (pTq == NULL) { return; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 924b0a8207..61ecbe8e5c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -27,6 +27,8 @@ typedef struct SMStreamCheckpointReadyRspMsg { SMsgHead head; } SMStreamCheckpointReadyRspMsg; +static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -490,16 +492,6 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } -static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) { - *startCheckTs = pTask->execInfo.checkTs; - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - *hasHTask = true; - pId->streamId = pTask->hTaskInfo.id.streamId; - pId->taskId = pTask->hTaskInfo.id.taskId; - } -} - int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); @@ -1053,10 +1045,9 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } -static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { +int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 0ee31197dc..3a4e3d81fb 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -157,9 +157,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); -int32_t streamTaskBackupCheckpoint(char* id, char* path); -int32_t downloadCheckpoint(char* id, char* path); -int32_t deleteCheckpoint(char* id); +int32_t streamTaskDownloadCheckpointData(char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 662d02a48f..da1096e7de 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -333,7 +333,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c taosRemoveDir(defaultPath); } - code = downloadCheckpoint(key, chkpPath); + code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } @@ -342,7 +342,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c return code; } int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - int32_t code = downloadCheckpoint(key, chkpPath); + int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } @@ -2110,8 +2110,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { - STaskDbWrapper* pDb = arg; - ECHECKPOINT_BACKUP_TYPE utype = type; + STaskDbWrapper* pDb = arg; + ECHECKPOINT_BACKUP_TYPE utype = type; if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3428fc36e1..5c7aaec623 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -27,7 +27,9 @@ typedef struct { } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); -static int32_t deleteCheckpointFile(char* id, char* name); +static int32_t deleteCheckpointFile(const char* id, const char* name); +static int32_t streamTaskBackupCheckpoint(char* id, char* path); +static int32_t deleteCheckpoint(char* id); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -644,9 +646,9 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch return 0; } -int32_t downloadCheckpoint(char* id, char* path) { +int32_t streamTaskDownloadCheckpointData(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("downloadCheckpoint parameters invalid"); + stError("streamTaskDownloadCheckpointData parameters invalid"); return -1; } @@ -672,7 +674,7 @@ int32_t deleteCheckpoint(char* id) { return 0; } -int32_t deleteCheckpointFile(char* id, char* name) { +int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); char* tmp = object; diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 0caad479e5..34e80fc08b 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -59,7 +59,7 @@ TEST(testCase, checkpointUpload_Test) { TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - // downloadCheckpoint(id, "/root/offset/download/"); + // streamTaskDownloadCheckpointData(id, "/root/offset/download/"); } TEST(testCase, checkpointDelete_Test) {