refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-22 16:57:09 +08:00
parent 5aae69338c
commit 96e1487787
1 changed files with 12 additions and 2 deletions

View File

@ -151,10 +151,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
int8_t ret = STREAM_STATA_COMPATIBLE; int8_t ret = STREAM_STATA_COMPATIBLE;
TBC* pCur = NULL; TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream
// no task info, no stream
return ret; return ret;
} }
void* pKey = NULL; void* pKey = NULL;
int32_t kLen = 0; int32_t kLen = 0;
void* pVal = NULL; void* pVal = NULL;
@ -165,20 +165,24 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
if (pVal == NULL || vLen == 0) { if (pVal == NULL || vLen == 0) {
break; break;
} }
SDecoder decoder; SDecoder decoder;
SCheckpointInfo info; SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
continue; continue;
} }
if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) { if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
ret = STREAM_STATA_NO_COMPATIBLE; ret = STREAM_STATA_NO_COMPATIBLE;
} else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) { } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
ret = STREAM_STATA_NEED_CONVERT; ret = STREAM_STATA_NEED_CONVERT;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
break; break;
} }
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
@ -193,6 +197,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
if (exist == false) { if (exist == false) {
return code; return code;
} }
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId); SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId);
void* pIter = taosHashIterate(pBackend->cfInst, NULL); void* pIter = taosHashIterate(pBackend->cfInst, NULL);
@ -219,6 +224,7 @@ _EXIT:
return code; return code;
} }
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
int8_t compatible = streamMetaCheckBackendCompatible(pMeta); int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
if (compatible == STREAM_STATA_COMPATIBLE) { if (compatible == STREAM_STATA_COMPATIBLE) {
@ -854,6 +860,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
}
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;