Merge pull request #25614 from taosdata/fix/3_liaohj
fix(stream): close task if it's in checkdown stream procedure.
This commit is contained in:
commit
44100139a4
|
@ -896,7 +896,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta);
|
|||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
|
|
|
@ -93,9 +93,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
streamMetaLoadAllTasks(pSnode->pMeta);
|
||||
|
||||
stopRsync();
|
||||
startRsync();
|
||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqInitialize(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
|
||||
if (tqMetaTransform(pTq) < 0) {
|
||||
return -1;
|
||||
|
|
|
@ -179,5 +179,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
|
|||
}
|
||||
|
||||
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
|
||||
return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
||||
streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ typedef struct SMStreamCheckpointReadyRspMsg {
|
|||
SMsgHead head;
|
||||
} SMStreamCheckpointReadyRspMsg;
|
||||
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
|
||||
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.fillHistory);
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
|
@ -490,16 +492,6 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
|
||||
}
|
||||
|
||||
static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) {
|
||||
*startCheckTs = pTask->execInfo.checkTs;
|
||||
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
*hasHTask = true;
|
||||
pId->streamId = pTask->hTaskInfo.id.streamId;
|
||||
pId->taskId = pTask->hTaskInfo.id.taskId;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
|
||||
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -772,13 +764,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
int64_t el = taosGetTimestampMs() - st;
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
|
||||
|
||||
code = streamMetaLoadAllTasks(pMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
|
||||
streamMetaWUnLock(pMeta);
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
streamMetaLoadAllTasks(pMeta);
|
||||
|
||||
{
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
@ -1053,10 +1039,9 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
|||
|
||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
|
||||
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -157,9 +157,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
|
|||
|
||||
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
|
||||
|
||||
int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||
int32_t downloadCheckpoint(char* id, char* path);
|
||||
int32_t deleteCheckpoint(char* id);
|
||||
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
|
||||
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
|
|
@ -333,7 +333,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
taosRemoveDir(defaultPath);
|
||||
}
|
||||
|
||||
code = downloadCheckpoint(key, chkpPath);
|
||||
code = streamTaskDownloadCheckpointData(key, chkpPath);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -342,7 +342,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
|
|||
return code;
|
||||
}
|
||||
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||
int32_t code = downloadCheckpoint(key, chkpPath);
|
||||
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -29,7 +29,9 @@ typedef struct {
|
|||
} SAsyncUploadArg;
|
||||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
static int32_t deleteCheckpointFile(char* id, char* name);
|
||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||
static int32_t streamTaskBackupCheckpoint(char* id, char* path);
|
||||
static int32_t deleteCheckpoint(char* id);
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -658,9 +660,9 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t downloadCheckpoint(char* id, char* path) {
|
||||
int32_t streamTaskDownloadCheckpointData(char* id, char* path) {
|
||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||
stError("downloadCheckpoint parameters invalid");
|
||||
stError("streamTaskDownloadCheckpointData parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -686,7 +688,7 @@ int32_t deleteCheckpoint(char* id) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t deleteCheckpointFile(char* id, char* name) {
|
||||
int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||
char object[128] = {0};
|
||||
snprintf(object, sizeof(object), "%s/%s", id, name);
|
||||
char* tmp = object;
|
||||
|
|
|
@ -117,31 +117,22 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t chkpId;
|
||||
char* path;
|
||||
char* taskId;
|
||||
|
||||
SArray* pChkpSave;
|
||||
SArray* pChkpInUse;
|
||||
int8_t chkpCap;
|
||||
void* backend;
|
||||
|
||||
} StreamMetaTaskState;
|
||||
|
||||
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
|
||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
|
||||
stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
|
||||
return -1;
|
||||
// goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
||||
stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
|
||||
stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -231,17 +222,18 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
|||
if (compatible == STREAM_STATA_COMPATIBLE) {
|
||||
return 0;
|
||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||
stInfo("stream state need covert backend format");
|
||||
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
|
||||
|
||||
return streamMetaCvtDbFormat(pMeta);
|
||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||
stError(
|
||||
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||
"manually",
|
||||
tsDataDir);
|
||||
pMeta->vgId, tsDataDir);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -322,35 +314,43 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
}
|
||||
|
||||
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
|
||||
stError("vgId:%d convert sub info format failed, open stream meta failed", pMeta->vgId);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (streamMetaBegin(pMeta) < 0) {
|
||||
stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
||||
if (pMeta->pTasksMap == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->updateInfo.pTasks == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
||||
if (pMeta->pHbInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -373,8 +373,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
|
||||
pMeta->numOfPausedTasks = 0;
|
||||
pMeta->numOfStreamTasks = 0;
|
||||
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
|
||||
stage);
|
||||
|
||||
stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
|
||||
|
||||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||
|
||||
|
@ -824,7 +824,8 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|||
return chkpId;
|
||||
}
|
||||
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||
// not allowed to return error code
|
||||
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||
TBC* pCur = NULL;
|
||||
void* pKey = NULL;
|
||||
int32_t kLen = 0;
|
||||
|
@ -833,7 +834,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
SDecoder decoder;
|
||||
|
||||
if (pMeta == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||
|
@ -844,7 +845,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
|
@ -937,7 +938,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
||||
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||
|
@ -1263,7 +1263,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
|||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->status.timerActive >= 1) {
|
||||
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId);
|
||||
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
|
||||
streamTaskStop(pTask);
|
||||
inTimer = true;
|
||||
}
|
||||
}
|
||||
|
@ -1643,7 +1644,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
|
||||
if (pStartInfo->startAllTasks != 1) {
|
||||
int64_t el = endTs - startTs;
|
||||
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||
stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||
pMeta->vgId, taskId, ready, el);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
|
@ -1651,7 +1652,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
|
||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
||||
qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
||||
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ TEST(testCase, checkpointUpload_Test) {
|
|||
|
||||
TEST(testCase, checkpointDownload_Test) {
|
||||
char* id = "2013892036";
|
||||
// downloadCheckpoint(id, "/root/offset/download/");
|
||||
// streamTaskDownloadCheckpointData(id, "/root/offset/download/");
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointDelete_Test) {
|
||||
|
|
Loading…
Reference in New Issue