add self check

This commit is contained in:
Yihao Deng 2024-06-29 04:33:14 +00:00
parent bf656ec80f
commit f023e7780c
1 changed files with 66 additions and 22 deletions

View File

@ -56,6 +56,13 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
pBlock->info.childId = pTask->info.selfChildId;
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
if (pChkpoint->blocks == NULL) {
taosMemoryFree(pBlock);
taosFreeQitem(pChkpoint);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosArrayPush(pChkpoint->blocks, pBlock);
taosMemoryFree(pBlock);
@ -110,7 +117,12 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
void* pBuf = rpcMallocCont(size);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
@ -131,6 +143,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
@ -1006,52 +1019,78 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
}
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
int32_t code = 0;
int32_t nBytes = 0;
if (s3Init() != 0) {
return -1;
}
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
TdDirEntryPtr de = NULL;
s3Init();
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
char filename[PATH_MAX] = {0};
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
snprintf(filename, sizeof(filename), "%s%s", path, name);
nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
if (nBytes <= 0 || nBytes >= sizeof(filename)) {
code = -1;
break;
}
} else {
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
if (nBytes <= 0 || nBytes >= sizeof(filename)) {
code = -1;
break;
}
}
char object[PATH_MAX] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if (nBytes <= 0 || nBytes >= sizeof(object)) {
code = -1;
break;
}
if (s3PutObjectFromFile2(filename, object, 0) != 0) {
taosCloseDir(&pDir);
return -1;
code = -1;
stError("[s3] failed to upload checkpoint:%s", filename);
} else {
stDebug("[s3] upload checkpoint:%s", filename);
}
stDebug("[s3] upload checkpoint:%s", filename);
// break;
}
taosCloseDir(&pDir);
return 0;
return code;
}
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
int32_t nBytes;
int32_t cap = strlen(id) + strlen(dstName) + 16;
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
code = terrno = TSDB_CODE_OUT_OF_MEMORY;
return code;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
nBytes = snprintf(buf, cap, "%s/%s", id, fname);
if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(buf);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) {
code = errno;
taosMemoryFree(buf);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
taosMemoryFree(buf);
return code;
return 0;
}
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
@ -1082,6 +1121,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
// fileName: CURRENT
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
terrno = TSDB_CODE_INVALID_PARA;
stError("down load checkpoint data parameters invalid");
return -1;
}
@ -1125,9 +1165,13 @@ int32_t deleteCheckpoint(const char* id) {
int32_t deleteCheckpointFile(const char* id, const char* name) {
char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name);
int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
if (nBytes <= 0 || nBytes >= sizeof(object)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
char* tmp = object;
s3DeleteObjects((const char**)&tmp, 1);
return 0;
return s3DeleteObjects((const char**)&tmp, 1);
}