refactor backend

This commit is contained in:
Yihao Deng 2024-07-04 10:25:37 +00:00
parent 7171b6dd6d
commit 8428a5be37
6 changed files with 295 additions and 297 deletions

View File

@ -136,6 +136,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130)
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131)

View File

@ -132,8 +132,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
// alloc // alloc
pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _err; goto _err;
} }
pWriter->pTq = pTq; pWriter->pTq = pTq;
@ -141,14 +140,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->ever = ever; pWriter->ever = ever;
if (taosMkDir(pTq->pStreamMeta->path) != 0) { if (taosMkDir(pTq->pStreamMeta->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode), tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(terrno)); STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(terrno));
goto _err; goto _err;
} }
SStreamSnapWriter* pSnapWriter = NULL; SStreamSnapWriter* pSnapWriter = NULL;
if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) { if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) {
goto _err; goto _err;
} }
@ -157,14 +156,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->pWriterImpl = pSnapWriter; pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return 0;
_err: _err:
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(terrno)); tstrerror(terrno));
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
return -1; return code;
} }
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {

File diff suppressed because it is too large Load Diff

View File

@ -548,15 +548,13 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
char* filePath = taosMemoryCalloc(1, cap); char* filePath = taosMemoryCalloc(1, cap);
if (filePath == NULL) { if (filePath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP"); int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
if (nBytes <= 0 || nBytes >= cap) { if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(filePath); taosMemoryFree(filePath);
terrno = TSDB_CODE_OUT_OF_RANGE; return TSDB_CODE_OUT_OF_RANGE;
return -1;
} }
code = downloadCheckpointDataByName(id, "META", filePath); code = downloadCheckpointDataByName(id, "META", filePath);
@ -584,19 +582,18 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
if (toDelFiles == NULL) { if (toDelFiles == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
pTask->id.idStr)) != 0) { pTask->id.idStr)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(terrno)); stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
} }
if (type == DATA_UPLOAD_S3) { if (type == DATA_UPLOAD_S3) {
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId, stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
tstrerror(terrno)); tstrerror(code));
} }
} }
@ -1003,11 +1000,13 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
int32_t nBytes = 0; int32_t nBytes = 0;
if (s3Init() != 0) { if (s3Init() != 0) {
return -1; return TSDB_CODE_THIRDPARTY_ERROR;
} }
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1; if (pDir == NULL) {
return TAOS_SYSTEM_ERROR(errno);
}
TdDirEntryPtr de = NULL; TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) { while ((de = taosReadDir(pDir)) != NULL) {
@ -1018,13 +1017,13 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) { if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name); nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
if (nBytes <= 0 || nBytes >= sizeof(filename)) { if (nBytes <= 0 || nBytes >= sizeof(filename)) {
code = -1; code = TSDB_CODE_OUT_OF_RANGE;
break; break;
} }
} else { } else {
nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
if (nBytes <= 0 || nBytes >= sizeof(filename)) { if (nBytes <= 0 || nBytes >= sizeof(filename)) {
code = -1; code = TSDB_CODE_OUT_OF_RANGE;
break; break;
} }
} }
@ -1032,14 +1031,13 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
char object[PATH_MAX] = {0}; char object[PATH_MAX] = {0};
nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if (nBytes <= 0 || nBytes >= sizeof(object)) { if (nBytes <= 0 || nBytes >= sizeof(object)) {
code = -1; code = TSDB_CODE_OUT_OF_RANGE;
break; break;
} }
if (s3PutObjectFromFile2(filename, object, 0) != 0) { code = s3PutObjectFromFile2(filename, object, 0);
terrno = TAOS_SYSTEM_ERROR(errno); if (code != 0) {
code = -1; stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
stError("[s3] failed to upload checkpoint:%s", filename);
} else { } else {
stDebug("[s3] upload checkpoint:%s", filename); stDebug("[s3] upload checkpoint:%s", filename);
} }
@ -1054,21 +1052,18 @@ int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char
char* buf = taosMemoryCalloc(1, cap); char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
nBytes = snprintf(buf, cap, "%s/%s", id, fname); nBytes = snprintf(buf, cap, "%s/%s", id, fname);
if (nBytes <= 0 || nBytes >= cap) { if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(buf); taosMemoryFree(buf);
terrno = TSDB_CODE_OUT_OF_RANGE; return TSDB_CODE_OUT_OF_RANGE;
return -1;
} }
int32_t code = s3GetObjectToFile(buf, dstName);
if (s3GetObjectToFile(buf, dstName) != 0) { if (code != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
terrno = TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
return -1;
} }
taosMemoryFree(buf); taosMemoryFree(buf);
return 0; return 0;
@ -1102,9 +1097,8 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
// fileName: CURRENT // fileName: CURRENT
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
terrno = TSDB_CODE_INVALID_PARA;
stError("down load checkpoint data parameters invalid"); stError("down load checkpoint data parameters invalid");
return -1; return TSDB_CODE_INVALID_PARA;
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
@ -1133,9 +1127,8 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path) {
int32_t deleteCheckpoint(const char* id) { int32_t deleteCheckpoint(const char* id) {
if (id == NULL || strlen(id) == 0) { if (id == NULL || strlen(id) == 0) {
terrno = TSDB_CODE_INVALID_PARA;
stError("deleteCheckpoint parameters invalid"); stError("deleteCheckpoint parameters invalid");
return terrno; return TSDB_CODE_INVALID_PARA;
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return deleteRsync(id); return deleteRsync(id);
@ -1156,8 +1149,9 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
char* tmp = object; char* tmp = object;
int32_t code = s3DeleteObjects((const char**)&tmp, 1); int32_t code = s3DeleteObjects((const char**)&tmp, 1);
if (code != 0) { if (code != 0) {
return code; return TSDB_CODE_THIRDPARTY_ERROR;
} }
return code;
} }
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
@ -1180,14 +1174,14 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code); tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
if (code < 0) { if (code < 0) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code)); stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
return -1; return TSDB_CODE_INVALID_MSG;
} }
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1; return TSDB_CODE_OUT_OF_MEMORY;
} }
SEncoder encoder; SEncoder encoder;

View File

@ -228,12 +228,12 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
return 0; return 0;
} }
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
terrno = 0; int32_t code = 0;
TdDirPtr pDir = taosOpenDir(pSnapFile->path); TdDirPtr pDir = taosOpenDir(pSnapFile->path);
if (NULL == pDir) { if (NULL == pDir) {
terrno = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); stError("%s failed to open %s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, tstrerror(code));
return terrno; return code;
} }
TdDirEntryPtr pDirEntry; TdDirEntryPtr pDirEntry;
@ -242,7 +242,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) { if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
pSnapFile->pCurrent = taosStrdup(name); pSnapFile->pCurrent = taosStrdup(name);
if (pSnapFile->pCurrent == NULL) { if (pSnapFile->pCurrent == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
continue; continue;
@ -250,7 +250,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) { if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
pSnapFile->pMainfest = taosStrdup(name); pSnapFile->pMainfest = taosStrdup(name);
if (pSnapFile->pMainfest == NULL) { if (pSnapFile->pMainfest == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
continue; continue;
@ -258,7 +258,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
pSnapFile->pOptions = taosStrdup(name); pSnapFile->pOptions = taosStrdup(name);
if (pSnapFile->pOptions == NULL) { if (pSnapFile->pOptions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
continue; continue;
@ -267,7 +267,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) { 0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
pSnapFile->pCheckpointMeta = taosStrdup(name); pSnapFile->pCheckpointMeta = taosStrdup(name);
if (pSnapFile->pCheckpointMeta == NULL) { if (pSnapFile->pCheckpointMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
continue; continue;
@ -276,7 +276,7 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) { 0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) {
pSnapFile->pCheckpointSelfcheck = taosStrdup(name); pSnapFile->pCheckpointSelfcheck = taosStrdup(name);
if (pSnapFile->pCheckpointSelfcheck == NULL) { if (pSnapFile->pCheckpointSelfcheck == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
continue; continue;
@ -285,17 +285,17 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
char* sst = taosStrdup(name); char* sst = taosStrdup(name);
if (sst == NULL) { if (sst == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
taosArrayPush(pSnapFile->pSst, &sst); taosArrayPush(pSnapFile->pSst, &sst);
} }
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
return terrno; return code;
} }
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
terrno = 0; int32_t code = 0;
int32_t nBytes = 0; int32_t nBytes = 0;
int32_t cap = strlen(pSnap->dbPrefixPath) + 256; int32_t cap = strlen(pSnap->dbPrefixPath) + 256;
@ -307,28 +307,28 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke
nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", pSnap->chkpId); "checkpoint", pSnap->chkpId);
if (nBytes <= 0 || nBytes >= cap) { if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE; code = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR; goto _ERROR;
} }
if (!taosIsDir(path)) { if (!taosIsDir(path)) {
terrno = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _ERROR; goto _ERROR;
} }
pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) { if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR; goto _ERROR;
} }
pSnapFile->path = path; pSnapFile->path = path;
pSnapFile->snapInfo = *pSnap; pSnapFile->snapInfo = *pSnap;
if ((terrno = snapFileReadMeta(pSnapFile)) != 0) { if ((code = snapFileReadMeta(pSnapFile)) != 0) {
goto _ERROR; goto _ERROR;
} }
if ((terrno = snapFileGenMeta(pSnapFile)) != 0) { if ((code = snapFileGenMeta(pSnapFile)) != 0) {
goto _ERROR; goto _ERROR;
} }
@ -337,7 +337,7 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke
_ERROR: _ERROR:
taosMemoryFree(path); taosMemoryFree(path);
return terrno; return code;
} }
void snapFileDestroy(SBackendSnapFile2* pSnap) { void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pCheckpointMeta); taosMemoryFree(pSnap->pCheckpointMeta);
@ -365,34 +365,32 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
} }
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later // impl later
int32_t code = 0;
terrno = 0;
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
if (pSnapInfoSet == NULL) { if (pSnapInfoSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
} }
terrno = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (terrno != 0) { if (code != 0) {
stError("failed to do task db snap info, reason:%s", tstrerror(terrno)); stError("failed to do task db snap info, reason:%s", tstrerror(code));
taosArrayDestroy(pSnapInfoSet); taosArrayDestroy(pSnapInfoSet);
return terrno; return code;
} }
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
if (pDbSnapSet == NULL) { if (pDbSnapSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(pSnapInfoSet); taosArrayDestroy(pSnapInfoSet);
return -1; code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
terrno = streamBackendSnapInitFile(path, pSnap, &snapFile); code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(terrno == 0); ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile); taosArrayPush(pDbSnapSet, &snapFile);
} }
pHandle->pDbSnapSet = pDbSnapSet; pHandle->pDbSnapSet = pDbSnapSet;
@ -403,7 +401,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
_err: _err:
streamSnapHandleDestroy(pHandle); streamSnapHandleDestroy(pHandle);
return terrno; return code;
} }
void streamSnapHandleDestroy(SStreamSnapHandle* handle) { void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
@ -431,8 +429,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
// impl later // impl later
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) { if (pReader == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
int32_t code = streamSnapHandleInit(&pReader->handle, (char*)path, pMeta); int32_t code = streamSnapHandleInit(&pReader->handle, (char*)path, pMeta);
@ -498,10 +495,10 @@ _NEXT:
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
if (nread == -1) { if (nread == -1) {
taosMemoryFree(buf); taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(errno);
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
item->type, tstrerror(code)); item->type, tstrerror(code));
return -1; return code;
} else if (nread > 0 && nread <= kBlockSize) { } else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize // left bytes less than kBlockSize
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
@ -558,6 +555,7 @@ _NEXT:
// SMetaSnapWriter ======================================== // SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) { int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) {
// impl later // impl later
int32_t code = 0;
SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter)); SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -568,23 +566,23 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->metaPath = taosStrdup(path); pHandle->metaPath = taosStrdup(path);
if (pHandle->metaPath == NULL) { if (pHandle->metaPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return terrno; code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
if (pHandle->pDbSnapSet == NULL) { if (pHandle->pDbSnapSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
streamSnapWriterClose(pWriter, 0); streamSnapWriterClose(pWriter, 0);
return terrno; code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) { if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
streamSnapWriterClose(pWriter, 0); streamSnapWriterClose(pWriter, 0);
return terrno; code = TSDB_CODE_OUT_OF_MEMORY;
return code;
} }
*ppWriter = pWriter; *ppWriter = pWriter;
@ -607,7 +605,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
if (pSnapFile->fd == 0) { if (pSnapFile->fd == 0) {
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pSnapFile->fd == NULL) { if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP, stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP,
pHdr->name, tstrerror(code)); pHdr->name, tstrerror(code));
} }
@ -615,7 +613,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
if (bytes != pHdr->size) { if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code; return code;
} else { } else {
@ -636,12 +634,16 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pSnapFile->fd == NULL) { if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP, stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code)); pHdr->name, tstrerror(code));
} }
taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); if (taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset) != pHdr->size) {
code = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code;
}
stInfo("succ to write data %s", pItem->name); stInfo("succ to write data %s", pItem->name);
pSnapFile->offset += pHdr->size; pSnapFile->offset += pHdr->size;
} }

View File

@ -96,6 +96,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")