refactor: do some internal refactor, and add some logs.
This commit is contained in:
parent
94a0f9ab8f
commit
732e52d64d
|
@ -896,7 +896,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
||||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
|
|
|
@ -93,9 +93,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) {
|
streamMetaLoadAllTasks(pSnode->pMeta);
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
stopRsync();
|
stopRsync();
|
||||||
startRsync();
|
startRsync();
|
||||||
|
|
|
@ -92,7 +92,7 @@ int32_t tqInitialize(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||||
|
|
||||||
if (tqMetaTransform(pTq) < 0) {
|
if (tqMetaTransform(pTq) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -179,5 +179,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
|
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
|
||||||
return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,31 +117,22 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t chkpId;
|
|
||||||
char* path;
|
|
||||||
char* taskId;
|
|
||||||
|
|
||||||
SArray* pChkpSave;
|
|
||||||
SArray* pChkpInUse;
|
|
||||||
int8_t chkpCap;
|
|
||||||
void* backend;
|
|
||||||
|
|
||||||
} StreamMetaTaskState;
|
|
||||||
|
|
||||||
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
|
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
|
||||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
|
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
|
||||||
|
stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
|
||||||
return -1;
|
return -1;
|
||||||
// goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
||||||
|
stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
|
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
|
||||||
|
stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,17 +222,18 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
||||||
if (compatible == STREAM_STATA_COMPATIBLE) {
|
if (compatible == STREAM_STATA_COMPATIBLE) {
|
||||||
return 0;
|
return 0;
|
||||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||||
stInfo("stream state need covert backend format");
|
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
|
||||||
|
|
||||||
return streamMetaCvtDbFormat(pMeta);
|
return streamMetaCvtDbFormat(pMeta);
|
||||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||||
stError(
|
stError(
|
||||||
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||||
"manually",
|
"manually",
|
||||||
tsDataDir);
|
pMeta->vgId, tsDataDir);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,33 +316,40 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
|
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaBegin(pMeta) < 0) {
|
if (streamMetaBegin(pMeta) < 0) {
|
||||||
|
stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||||
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
|
||||||
if (pMeta->pTasksMap == NULL) {
|
if (pMeta->pTasksMap == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||||
if (pMeta->updateInfo.pTasks == NULL) {
|
if (pMeta->updateInfo.pTasks == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||||
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
||||||
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
||||||
if (pMeta->pHbInfo == NULL) {
|
if (pMeta->pHbInfo == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -824,7 +823,8 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
||||||
return chkpId;
|
return chkpId;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
// not allowed to return error code
|
||||||
|
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
void* pKey = NULL;
|
void* pKey = NULL;
|
||||||
int32_t kLen = 0;
|
int32_t kLen = 0;
|
||||||
|
@ -833,7 +833,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
|
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||||
|
@ -844,7 +844,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
||||||
taosArrayDestroy(pRecycleList);
|
taosArrayDestroy(pRecycleList);
|
||||||
return TSDB_CODE_SUCCESS;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTbcMoveToFirst(pCur);
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
@ -937,7 +937,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
||||||
|
|
||||||
taosArrayDestroy(pRecycleList);
|
taosArrayDestroy(pRecycleList);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||||
|
@ -1644,7 +1643,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
|
|
||||||
if (pStartInfo->startAllTasks != 1) {
|
if (pStartInfo->startAllTasks != 1) {
|
||||||
int64_t el = endTs - startTs;
|
int64_t el = endTs - startTs;
|
||||||
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||||
pMeta->vgId, taskId, ready, el);
|
pMeta->vgId, taskId, ready, el);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1652,7 +1651,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
|
|
||||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
||||||
qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue