refactor: do some internal refactor.
This commit is contained in:
parent
cb5a0563e6
commit
629502e12e
|
@ -85,26 +85,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tqInitialize(STQ* pTq) {
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTq->pOffsetStore = tqOffsetOpen(pTq);
|
||||
if (pTq->pOffsetStore == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tqClose(STQ* pTq) {
|
||||
qDebug("start to close tq");
|
||||
if (pTq == NULL) {
|
||||
|
@ -137,6 +117,26 @@ void tqClose(STQ* pTq) {
|
|||
taosMemoryFree(pTq);
|
||||
}
|
||||
|
||||
int32_t tqInitialize(STQ* pTq) {
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTq->pOffsetStore = tqOffsetOpen(pTq);
|
||||
if (pTq->pOffsetStore == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tqNotifyClose(STQ* pTq) {
|
||||
if (pTq == NULL) {
|
||||
return;
|
||||
|
|
|
@ -27,6 +27,8 @@ typedef struct SMStreamCheckpointReadyRspMsg {
|
|||
SMsgHead head;
|
||||
} SMStreamCheckpointReadyRspMsg;
|
||||
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
|
||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.fillHistory);
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
|
@ -490,16 +492,6 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
|
||||
}
|
||||
|
||||
static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) {
|
||||
*startCheckTs = pTask->execInfo.checkTs;
|
||||
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
*hasHTask = true;
|
||||
pId->streamId = pTask->hTaskInfo.id.streamId;
|
||||
pId->taskId = pTask->hTaskInfo.id.taskId;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
|
||||
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -1053,10 +1045,9 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
|||
|
||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
|
||||
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -157,9 +157,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
|
|||
|
||||
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
|
||||
|
||||
int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||
int32_t downloadCheckpoint(char* id, char* path);
|
||||
int32_t deleteCheckpoint(char* id);
|
||||
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
|
||||
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
|
|
@ -333,7 +333,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
taosRemoveDir(defaultPath);
|
||||
}
|
||||
|
||||
code = downloadCheckpoint(key, chkpPath);
|
||||
code = streamTaskDownloadCheckpointData(key, chkpPath);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -342,7 +342,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
return code;
|
||||
}
|
||||
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||
int32_t code = downloadCheckpoint(key, chkpPath);
|
||||
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2110,8 +2110,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
|||
return code;
|
||||
}
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
||||
STaskDbWrapper* pDb = arg;
|
||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||
STaskDbWrapper* pDb = arg;
|
||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||
|
||||
if (utype == DATA_UPLOAD_RSYNC) {
|
||||
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
||||
|
|
|
@ -27,7 +27,9 @@ typedef struct {
|
|||
} SAsyncUploadArg;
|
||||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
static int32_t deleteCheckpointFile(char* id, char* name);
|
||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||
static int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||
static int32_t deleteCheckpoint(char* id);
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -644,9 +646,9 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t downloadCheckpoint(char* id, char* path) {
|
||||
int32_t streamTaskDownloadCheckpointData(char* id, char* path) {
|
||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||
stError("downloadCheckpoint parameters invalid");
|
||||
stError("streamTaskDownloadCheckpointData parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -672,7 +674,7 @@ int32_t deleteCheckpoint(char* id) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t deleteCheckpointFile(char* id, char* name) {
|
||||
int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||
char object[128] = {0};
|
||||
snprintf(object, sizeof(object), "%s/%s", id, name);
|
||||
char* tmp = object;
|
||||
|
|
|
@ -59,7 +59,7 @@ TEST(testCase, checkpointUpload_Test) {
|
|||
|
||||
TEST(testCase, checkpointDownload_Test) {
|
||||
char* id = "2013892036";
|
||||
// downloadCheckpoint(id, "/root/offset/download/");
|
||||
// streamTaskDownloadCheckpointData(id, "/root/offset/download/");
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointDelete_Test) {
|
||||
|
|
Loading…
Reference in New Issue