add self check

This commit is contained in:
Yihao Deng 2024-06-29 04:35:54 +00:00
parent f023e7780c
commit 7290920c6f
3 changed files with 64 additions and 11 deletions

View File

@ -67,7 +67,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
_err: _err:
tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(code)); tstrerror(terrno));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
@ -145,14 +145,15 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
goto _err; goto _err;
} }
tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path); tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
pTq->pStreamMeta->path);
pWriter->pWriterImpl = pSnapWriter; pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
_err: _err:
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(code)); tstrerror(terrno));
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
return -1; return -1;

View File

@ -813,6 +813,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SArray* pList = pTask->upstreamInfo.pList; SArray* pList = pTask->upstreamInfo.pList;
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
if (pNotSendList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
return;
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
@ -1057,6 +1062,7 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
} }
if (s3PutObjectFromFile2(filename, object, 0) != 0) { if (s3PutObjectFromFile2(filename, object, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1; code = -1;
stError("[s3] failed to upload checkpoint:%s", filename); stError("[s3] failed to upload checkpoint:%s", filename);
} else { } else {
@ -1152,6 +1158,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path) {
int32_t deleteCheckpoint(const char* id) { int32_t deleteCheckpoint(const char* id) {
if (id == NULL || strlen(id) == 0) { if (id == NULL || strlen(id) == 0) {
terrno = TSDB_CODE_INVALID_PARA;
stError("deleteCheckpoint parameters invalid"); stError("deleteCheckpoint parameters invalid");
return -1; return -1;
} }

View File

@ -130,7 +130,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int32_t ret = 0; int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32); char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL, NULL); ret = taosStatFile(fullname, sz, NULL, NULL);
@ -259,17 +259,33 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
} }
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
int32_t code = -1; int32_t code = -1;
int32_t nBytes = 0;
int32_t cap = strlen(pSnap->dbPrefixPath) + 256;
char* path = taosMemoryCalloc(1, cap);
if (path == NULL) {
return -1;
}
nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", pSnap->chkpId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256);
// char idstr[64] = {0};
sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
pSnap->chkpId);
if (!taosIsDir(path)) { if (!taosIsDir(path)) {
terrno = TSDB_CODE_INVALID_MSG;
goto _ERROR; goto _ERROR;
} }
pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR;
}
pSnapFile->path = path; pSnapFile->path = path;
pSnapFile->snapInfo = *pSnap; pSnapFile->snapInfo = *pSnap;
if ((code = snapFileReadMeta(pSnapFile)) != 0) { if ((code = snapFileReadMeta(pSnapFile)) != 0) {
@ -313,8 +329,15 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
} }
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later // impl later
int32_t code = 0;
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (pSnapInfoSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) { if (code != 0) {
stError("failed to do task db snap info, reason:%s", tstrerror(terrno)); stError("failed to do task db snap info, reason:%s", tstrerror(terrno));
taosArrayDestroy(pSnapInfoSet); taosArrayDestroy(pSnapInfoSet);
@ -322,6 +345,11 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
} }
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
if (pDbSnapSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(pSnapInfoSet);
return -1;
}
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
@ -369,7 +397,8 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
// impl later // impl later
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) { if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) { if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) {
@ -501,11 +530,27 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
pHandle->currIdx = 0; pHandle->currIdx = 0;
pHandle->metaPath = taosStrdup(path); pHandle->metaPath = taosStrdup(path);
if (pHandle->metaPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pWriter);
}
pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
if (pHandle->pDbSnapSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pHandle->metaPath);
taosMemoryFree(pWriter);
return -1;
}
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
taosArrayPush(pHandle->pDbSnapSet, &snapFile); if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
streamSnapWriterClose(pWriter, 0);
return -1;
}
*ppWriter = pWriter; *ppWriter = pWriter;
return 0; return 0;