commit
0fcc57b52a
|
@ -138,6 +138,7 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
|
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
|
||||||
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
|
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
|
||||||
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
|
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
|
||||||
|
#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F)
|
||||||
|
|
||||||
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130)
|
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130)
|
||||||
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131)
|
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131)
|
||||||
|
|
|
@ -51,10 +51,9 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
||||||
|
|
||||||
SStreamSnapReader* pSnapReader = NULL;
|
SStreamSnapReader* pSnapReader = NULL;
|
||||||
|
|
||||||
if (streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader) == 0) {
|
if ((code = streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader)) == 0) {
|
||||||
pReader->complete = 1;
|
pReader->complete = 1;
|
||||||
} else {
|
} else {
|
||||||
code = -1;
|
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -75,7 +74,7 @@ _err:
|
||||||
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
|
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
|
tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
streamSnapReaderClose(pReader->pReaderImpl);
|
code = streamSnapReaderClose(pReader->pReaderImpl);
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -138,32 +137,36 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
taosMkDir(pTq->pStreamMeta->path);
|
if (taosMkDir(pTq->pStreamMeta->path) != 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
SStreamSnapWriter* pSnapWriter = NULL;
|
tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
|
||||||
if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) {
|
STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(code));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path);
|
SStreamSnapWriter* pSnapWriter = NULL;
|
||||||
|
if ((code = streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter)) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
|
||||||
|
pTq->pStreamMeta->path);
|
||||||
pWriter->pWriterImpl = pSnapWriter;
|
pWriter->pWriterImpl = pSnapWriter;
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return code;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
|
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
|
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
|
||||||
int32_t code = 0;
|
|
||||||
tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
|
return streamSnapWriterClose(pWriter->pWriterImpl, rollback);
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
|
|
@ -131,20 +131,21 @@ typedef struct {
|
||||||
TdThreadRwlock rwLock;
|
TdThreadRwlock rwLock;
|
||||||
} SBkdMgt;
|
} SBkdMgt;
|
||||||
|
|
||||||
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId);
|
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
|
||||||
|
|
||||||
|
bool streamBackendDataIsExist(const char* path, int64_t chkpId);
|
||||||
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
|
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
|
||||||
void streamBackendCleanup(void* arg);
|
void streamBackendCleanup(void* arg);
|
||||||
void streamBackendHandleCleanup(void* arg);
|
void streamBackendHandleCleanup(void* arg);
|
||||||
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
||||||
int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId);
|
int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId, int64_t processver);
|
||||||
SListNode* streamBackendAddCompare(void* backend, void* arg);
|
SListNode* streamBackendAddCompare(void* backend, void* arg);
|
||||||
void streamBackendDelCompare(void* backend, void* arg);
|
void streamBackendDelCompare(void* backend, void* arg);
|
||||||
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
|
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
|
||||||
|
|
||||||
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId);
|
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId, int64_t* processVer);
|
||||||
void taskDbDestroy(void* pBackend, bool flush);
|
void taskDbDestroy(void* pBackend, bool flush);
|
||||||
void taskDbDestroy2(void* pBackend);
|
void taskDbDestroy2(void* pBackend);
|
||||||
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
|
|
||||||
|
|
||||||
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
|
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
|
||||||
|
|
||||||
|
@ -249,7 +250,7 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
|
||||||
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
|
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
|
||||||
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
|
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
|
||||||
|
|
||||||
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
|
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
|
||||||
|
|
||||||
SBkdMgt* bkdMgtCreate(char* path);
|
SBkdMgt* bkdMgtCreate(char* path);
|
||||||
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
|
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
|
||||||
|
@ -259,6 +260,7 @@ void bkdMgtDestroy(SBkdMgt* bm);
|
||||||
|
|
||||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
|
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
|
||||||
const char* id);
|
const char* id);
|
||||||
|
int32_t remoteChkpGetDelFile(char* path, SArray* toDel);
|
||||||
|
|
||||||
void* taskAcquireDb(int64_t refId);
|
void* taskAcquireDb(int64_t refId);
|
||||||
void taskReleaseDb(int64_t refId);
|
void taskReleaseDb(int64_t refId);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -57,6 +57,13 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
|
||||||
pBlock->info.childId = pTask->info.selfChildId;
|
pBlock->info.childId = pTask->info.selfChildId;
|
||||||
|
|
||||||
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
|
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
|
||||||
|
if (pChkpoint->blocks == NULL) {
|
||||||
|
taosMemoryFree(pBlock);
|
||||||
|
taosFreeQitem(pChkpoint);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(pChkpoint->blocks, pBlock);
|
taosArrayPush(pChkpoint->blocks, pBlock);
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
|
@ -112,7 +119,12 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
|
||||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||||
SRpcHandleInfo* pRpcInfo, int32_t code) {
|
SRpcHandleInfo* pRpcInfo, int32_t code) {
|
||||||
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
|
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
|
||||||
|
|
||||||
void* pBuf = rpcMallocCont(size);
|
void* pBuf = rpcMallocCont(size);
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
@ -133,6 +145,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
|
||||||
|
|
||||||
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
|
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
|
||||||
tmsgSendRsp(&rspMsg);
|
tmsgSendRsp(&rspMsg);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -522,65 +535,57 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
|
||||||
char buf[128] = {0};
|
int32_t code = 0;
|
||||||
|
int32_t cap = strlen(path) + 64;
|
||||||
|
|
||||||
char* file = taosMemoryCalloc(1, strlen(path) + 32);
|
char* filePath = taosMemoryCalloc(1, cap);
|
||||||
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
|
if (filePath == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = downloadCheckpointDataByName(id, "META", file);
|
int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
|
||||||
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
|
taosMemoryFree(filePath);
|
||||||
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = downloadCheckpointDataByName(id, "META", filePath);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stDebug("%s chkp failed to download meta file:%s", id, file);
|
stError("%s chkp failed to download meta file:%s", id, filePath);
|
||||||
taosMemoryFree(file);
|
taosMemoryFree(filePath);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
code = remoteChkpGetDelFile(filePath, list);
|
||||||
if (pFile == NULL) {
|
if (code != 0) {
|
||||||
stError("%s failed to open meta file:%s for checkpoint", id, file);
|
stError("%s chkp failed to get to del:%s", id, filePath);
|
||||||
code = -1;
|
taosMemoryFree(filePath);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
|
|
||||||
stError("%s failed to read meta file:%s for checkpoint", id, file);
|
|
||||||
code = -1;
|
|
||||||
} else {
|
|
||||||
int32_t len = strnlen(buf, tListLen(buf));
|
|
||||||
for (int i = 0; i < len; i++) {
|
|
||||||
if (buf[i] == '\n') {
|
|
||||||
char* item = taosMemoryCalloc(1, i + 1);
|
|
||||||
memcpy(item, buf, i);
|
|
||||||
taosArrayPush(list, &item);
|
|
||||||
|
|
||||||
item = taosMemoryCalloc(1, len - i);
|
|
||||||
memcpy(item, buf + i + 1, len - i - 1);
|
|
||||||
taosArrayPush(list, &item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
|
||||||
taosRemoveFile(file);
|
|
||||||
taosMemoryFree(file);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
|
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
|
||||||
char* path = NULL;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
char* path = NULL;
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
const char* idStr = pTask->id.idStr;
|
const char* idStr = pTask->id.idStr;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
|
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
if (toDelFiles == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
|
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
|
||||||
pTask->id.idStr)) != 0) {
|
pTask->id.idStr)) != 0) {
|
||||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId);
|
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == DATA_UPLOAD_S3) {
|
if (type == DATA_UPLOAD_S3) {
|
||||||
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
|
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
|
||||||
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId);
|
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,7 +594,8 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path);
|
stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -662,7 +668,8 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
|
||||||
|
|
||||||
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
|
int64_t ver = pTask->chkInfo.processedVer;
|
||||||
|
code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
|
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
@ -770,6 +777,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
SArray* pList = pTask->upstreamInfo.pList;
|
SArray* pList = pTask->upstreamInfo.pList;
|
||||||
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
|
||||||
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
||||||
|
if (pNotSendList == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||||
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
|
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
|
||||||
|
@ -976,52 +988,77 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
|
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t nBytes = 0;
|
||||||
|
|
||||||
|
if (s3Init() != 0) {
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(path);
|
TdDirPtr pDir = taosOpenDir(path);
|
||||||
if (pDir == NULL) return -1;
|
if (pDir == NULL) {
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
|
||||||
TdDirEntryPtr de = NULL;
|
TdDirEntryPtr de = NULL;
|
||||||
s3Init();
|
|
||||||
while ((de = taosReadDir(pDir)) != NULL) {
|
while ((de = taosReadDir(pDir)) != NULL) {
|
||||||
char* name = taosGetDirEntryName(de);
|
char* name = taosGetDirEntryName(de);
|
||||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
|
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
|
||||||
|
|
||||||
char filename[PATH_MAX] = {0};
|
char filename[PATH_MAX] = {0};
|
||||||
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
|
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
|
||||||
snprintf(filename, sizeof(filename), "%s%s", path, name);
|
nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
|
||||||
|
if (nBytes <= 0 || nBytes >= sizeof(filename)) {
|
||||||
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
|
nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
|
||||||
|
if (nBytes <= 0 || nBytes >= sizeof(filename)) {
|
||||||
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char object[PATH_MAX] = {0};
|
char object[PATH_MAX] = {0};
|
||||||
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
||||||
|
if (nBytes <= 0 || nBytes >= sizeof(object)) {
|
||||||
if (s3PutObjectFromFile2(filename, object, 0) != 0) {
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
taosCloseDir(&pDir);
|
break;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = s3PutObjectFromFile2(filename, object, 0);
|
||||||
|
if (code != 0) {
|
||||||
|
stError("[s3] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
|
||||||
|
} else {
|
||||||
stDebug("[s3] upload checkpoint:%s", filename);
|
stDebug("[s3] upload checkpoint:%s", filename);
|
||||||
// break;
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
taosCloseDir(&pDir);
|
taosCloseDir(&pDir);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
|
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
|
||||||
int32_t code = 0;
|
int32_t nBytes;
|
||||||
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
|
int32_t cap = strlen(id) + strlen(dstName) + 16;
|
||||||
|
|
||||||
|
char* buf = taosMemoryCalloc(1, cap);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
code = terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
sprintf(buf, "%s/%s", id, fname);
|
|
||||||
if (s3GetObjectToFile(buf, dstName) != 0) {
|
|
||||||
code = errno;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nBytes = snprintf(buf, cap, "%s/%s", id, fname);
|
||||||
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
return code;
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
int32_t code = s3GetObjectToFile(buf, dstName);
|
||||||
|
if (code != 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
||||||
|
@ -1035,13 +1072,17 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
||||||
|
int32_t code = 0;
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("invalid parameters in upload checkpoint, %s", id);
|
stError("invalid parameters in upload checkpoint, %s", id);
|
||||||
return -1;
|
return TSDB_CODE_INVALID_CFG;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return uploadByRsync(id, path);
|
code = uploadByRsync(id, path);
|
||||||
|
if (code != 0) {
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
} else if (tsS3StreamEnabled) {
|
} else if (tsS3StreamEnabled) {
|
||||||
return uploadCheckpointToS3(id, path);
|
return uploadCheckpointToS3(id, path);
|
||||||
}
|
}
|
||||||
|
@ -1053,7 +1094,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
||||||
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
|
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
|
||||||
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
||||||
stError("down load checkpoint data parameters invalid");
|
stError("down load checkpoint data parameters invalid");
|
||||||
return -1;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
|
@ -1083,7 +1124,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path) {
|
||||||
int32_t deleteCheckpoint(const char* id) {
|
int32_t deleteCheckpoint(const char* id) {
|
||||||
if (id == NULL || strlen(id) == 0) {
|
if (id == NULL || strlen(id) == 0) {
|
||||||
stError("deleteCheckpoint parameters invalid");
|
stError("deleteCheckpoint parameters invalid");
|
||||||
return -1;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return deleteRsync(id);
|
return deleteRsync(id);
|
||||||
|
@ -1095,11 +1136,18 @@ int32_t deleteCheckpoint(const char* id) {
|
||||||
|
|
||||||
int32_t deleteCheckpointFile(const char* id, const char* name) {
|
int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||||
char object[128] = {0};
|
char object[128] = {0};
|
||||||
snprintf(object, sizeof(object), "%s/%s", id, name);
|
|
||||||
|
int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
|
||||||
|
if (nBytes <= 0 || nBytes >= sizeof(object)) {
|
||||||
|
return TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
}
|
||||||
|
|
||||||
char* tmp = object;
|
char* tmp = object;
|
||||||
s3DeleteObjects((const char**)&tmp, 1);
|
int32_t code = s3DeleteObjects((const char**)&tmp, 1);
|
||||||
return 0;
|
if (code != 0) {
|
||||||
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
|
@ -1134,14 +1182,14 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
|
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
|
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
|
||||||
return -1;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
void* buf = rpcMallocCont(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
|
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
|
||||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
return -1;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
|
|
|
@ -182,9 +182,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
||||||
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
|
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
|
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
|
||||||
|
terrno = 0;
|
||||||
bool exist = streamBackendDataIsExist(pMeta->path, chkpId, pMeta->vgId);
|
bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
|
||||||
if (exist == false) {
|
if (exist == false) {
|
||||||
|
code = terrno;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,8 +253,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskDbWrapper* pBackend = NULL;
|
STaskDbWrapper* pBackend = NULL;
|
||||||
|
int64_t processVer = -1;
|
||||||
while (1) {
|
while (1) {
|
||||||
pBackend = taskDbOpen(pMeta->path, key, chkpId);
|
pBackend = taskDbOpen(pMeta->path, key, chkpId, &processVer);
|
||||||
if (pBackend != NULL) {
|
if (pBackend != NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -271,6 +273,8 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
||||||
pBackend->pTask = pTask;
|
pBackend->pTask = pTask;
|
||||||
pBackend->pMeta = pMeta;
|
pBackend->pMeta = pMeta;
|
||||||
|
|
||||||
|
if (processVer != -1) pTask->chkInfo.processedVer = processVer;
|
||||||
|
|
||||||
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||||
|
|
||||||
|
@ -308,7 +312,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
|
if (streamMetaMayCvtDbFormat(pMeta) < 0) {
|
||||||
stError("vgId:%d convert sub info format failed, open stream meta failed", pMeta->vgId);
|
stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
|
||||||
|
tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,6 +398,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
|
||||||
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
|
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
|
||||||
|
|
||||||
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
|
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
|
||||||
|
if (pMeta->bkdChkptMgt == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
taosThreadMutexInit(&pMeta->backendMutex, NULL);
|
taosThreadMutexInit(&pMeta->backendMutex, NULL);
|
||||||
|
|
||||||
return pMeta;
|
return pMeta;
|
||||||
|
@ -408,9 +416,10 @@ _err:
|
||||||
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||||
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||||
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||||
|
if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
|
|
||||||
stError("failed to open stream meta");
|
stError("failed to open stream meta, reason:%s", tstrerror(terrno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ enum SBackendFileType {
|
||||||
ROCKSDB_SST_TYPE = 3,
|
ROCKSDB_SST_TYPE = 3,
|
||||||
ROCKSDB_CURRENT_TYPE = 4,
|
ROCKSDB_CURRENT_TYPE = 4,
|
||||||
ROCKSDB_CHECKPOINT_META_TYPE = 5,
|
ROCKSDB_CHECKPOINT_META_TYPE = 5,
|
||||||
|
ROCKSDB_CHECKPOINT_SELFCHECK_TYPE = 6,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SBackendFileItem {
|
typedef struct SBackendFileItem {
|
||||||
|
@ -49,6 +50,7 @@ typedef struct SBackendSnapFiles2 {
|
||||||
char* pOptions;
|
char* pOptions;
|
||||||
SArray* pSst;
|
SArray* pSst;
|
||||||
char* pCheckpointMeta;
|
char* pCheckpointMeta;
|
||||||
|
char* pCheckpointSelfcheck;
|
||||||
char* path;
|
char* path;
|
||||||
|
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
|
@ -111,6 +113,7 @@ const char* ROCKSDB_MAINFEST = "MANIFEST";
|
||||||
const char* ROCKSDB_SST = "sst";
|
const char* ROCKSDB_SST = "sst";
|
||||||
const char* ROCKSDB_CURRENT = "CURRENT";
|
const char* ROCKSDB_CURRENT = "CURRENT";
|
||||||
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
|
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
|
||||||
|
const char* ROCKSDB_CHECKPOINT_SELF_CHECK = "info";
|
||||||
static int64_t kBlockSize = 64 * 1024;
|
static int64_t kBlockSize = 64 * 1024;
|
||||||
|
|
||||||
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
|
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
|
||||||
|
@ -127,6 +130,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
|
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
|
||||||
|
|
||||||
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
|
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
|
||||||
|
|
||||||
ret = taosStatFile(fullname, sz, NULL, NULL);
|
ret = taosStatFile(fullname, sz, NULL, NULL);
|
||||||
|
@ -148,8 +152,20 @@ int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDest
|
||||||
|
|
||||||
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
||||||
if (qDebugFlag & DEBUG_DEBUG) {
|
if (qDebugFlag & DEBUG_DEBUG) {
|
||||||
char* buf = taosMemoryCalloc(1, 512);
|
int16_t cap = 512;
|
||||||
sprintf(buf + strlen(buf), "[");
|
|
||||||
|
char* buf = taosMemoryCalloc(1, cap);
|
||||||
|
if (buf == NULL) {
|
||||||
|
stError("%s failed to alloc memory, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nBytes = snprintf(buf + strlen(buf), cap, "[");
|
||||||
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
stError("%s failed to write buf, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
|
if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
|
||||||
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
|
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
|
||||||
|
@ -157,10 +173,10 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
||||||
if (pSnapFile->pSst) {
|
if (pSnapFile->pSst) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
|
||||||
char* name = taosArrayGetP(pSnapFile->pSst, i);
|
char* name = taosArrayGetP(pSnapFile->pSst, i);
|
||||||
sprintf(buf + strlen(buf), "%s,", name);
|
if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sprintf(buf + strlen(buf) - 1, "]");
|
if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]");
|
||||||
|
|
||||||
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
|
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
|
||||||
pSnapFile->snapInfo.taskId, buf);
|
pSnapFile->snapInfo.taskId, buf);
|
||||||
|
@ -199,16 +215,25 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
// meta
|
// meta
|
||||||
item.name = pSnapFile->pCheckpointMeta;
|
item.name = pSnapFile->pCheckpointMeta;
|
||||||
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
|
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
|
||||||
|
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
||||||
|
taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
|
}
|
||||||
|
|
||||||
|
item.name = pSnapFile->pCheckpointSelfcheck;
|
||||||
|
item.type = ROCKSDB_CHECKPOINT_SELFCHECK_TYPE;
|
||||||
|
|
||||||
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
|
||||||
taosArrayPush(pSnapFile->pFileList, &item);
|
taosArrayPush(pSnapFile->pFileList, &item);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
|
int32_t code = 0;
|
||||||
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
|
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
|
||||||
if (NULL == pDir) {
|
if (NULL == pDir) {
|
||||||
stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
stError("%s failed to open %s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdDirEntryPtr pDirEntry;
|
TdDirEntryPtr pDirEntry;
|
||||||
|
@ -216,43 +241,88 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||||
char* name = taosGetDirEntryName(pDirEntry);
|
char* name = taosGetDirEntryName(pDirEntry);
|
||||||
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
|
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
|
||||||
pSnapFile->pCurrent = taosStrdup(name);
|
pSnapFile->pCurrent = taosStrdup(name);
|
||||||
|
if (pSnapFile->pCurrent == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
|
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
|
||||||
pSnapFile->pMainfest = taosStrdup(name);
|
pSnapFile->pMainfest = taosStrdup(name);
|
||||||
|
if (pSnapFile->pMainfest == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
|
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
|
||||||
pSnapFile->pOptions = taosStrdup(name);
|
pSnapFile->pOptions = taosStrdup(name);
|
||||||
|
if (pSnapFile->pOptions == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
|
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
|
||||||
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
|
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
|
||||||
pSnapFile->pCheckpointMeta = taosStrdup(name);
|
pSnapFile->pCheckpointMeta = taosStrdup(name);
|
||||||
|
if (pSnapFile->pCheckpointMeta == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_SELF_CHECK) &&
|
||||||
|
0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) {
|
||||||
|
pSnapFile->pCheckpointSelfcheck = taosStrdup(name);
|
||||||
|
if (pSnapFile->pCheckpointSelfcheck == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (strlen(name) >= strlen(ROCKSDB_SST) &&
|
if (strlen(name) >= strlen(ROCKSDB_SST) &&
|
||||||
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
|
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
|
||||||
char* sst = taosStrdup(name);
|
char* sst = taosStrdup(name);
|
||||||
|
if (sst == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
taosArrayPush(pSnapFile->pSst, &sst);
|
taosArrayPush(pSnapFile->pSst, &sst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosCloseDir(&pDir);
|
taosCloseDir(&pDir);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
|
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
|
||||||
int32_t code = -1;
|
int32_t code = 0;
|
||||||
|
int32_t nBytes = 0;
|
||||||
|
int32_t cap = strlen(pSnap->dbPrefixPath) + 256;
|
||||||
|
|
||||||
|
char* path = taosMemoryCalloc(1, cap);
|
||||||
|
if (path == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP,
|
||||||
|
"checkpoint", pSnap->chkpId);
|
||||||
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
goto _ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256);
|
|
||||||
// char idstr[64] = {0};
|
|
||||||
sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
|
|
||||||
pSnap->chkpId);
|
|
||||||
if (!taosIsDir(path)) {
|
if (!taosIsDir(path)) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
|
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
|
||||||
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
|
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
|
||||||
|
if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
pSnapFile->path = path;
|
pSnapFile->path = path;
|
||||||
pSnapFile->snapInfo = *pSnap;
|
pSnapFile->snapInfo = *pSnap;
|
||||||
if ((code = snapFileReadMeta(pSnapFile)) != 0) {
|
if ((code = snapFileReadMeta(pSnapFile)) != 0) {
|
||||||
|
@ -264,7 +334,6 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke
|
||||||
|
|
||||||
snapFileDebugInfo(pSnapFile);
|
snapFileDebugInfo(pSnapFile);
|
||||||
path = NULL;
|
path = NULL;
|
||||||
code = 0;
|
|
||||||
|
|
||||||
_ERROR:
|
_ERROR:
|
||||||
taosMemoryFree(path);
|
taosMemoryFree(path);
|
||||||
|
@ -276,6 +345,7 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
|
||||||
taosMemoryFree(pSnap->pMainfest);
|
taosMemoryFree(pSnap->pMainfest);
|
||||||
taosMemoryFree(pSnap->pOptions);
|
taosMemoryFree(pSnap->pOptions);
|
||||||
taosMemoryFree(pSnap->path);
|
taosMemoryFree(pSnap->path);
|
||||||
|
taosMemoryFree(pSnap->pCheckpointSelfcheck);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
|
||||||
char* sst = taosArrayGetP(pSnap->pSst, i);
|
char* sst = taosArrayGetP(pSnap->pSst, i);
|
||||||
taosMemoryFree(sst);
|
taosMemoryFree(sst);
|
||||||
|
@ -295,14 +365,25 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
|
||||||
}
|
}
|
||||||
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
|
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
|
||||||
// impl later
|
// impl later
|
||||||
|
int32_t code = 0;
|
||||||
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
|
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
|
||||||
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
|
if (pSnapInfoSet == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
stError("failed to do task db snap info, reason:%s", tstrerror(code));
|
||||||
taosArrayDestroy(pSnapInfoSet);
|
taosArrayDestroy(pSnapInfoSet);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
||||||
|
if (pDbSnapSet == NULL) {
|
||||||
|
taosArrayDestroy(pSnapInfoSet);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
|
||||||
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
|
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
|
||||||
|
@ -318,6 +399,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
|
||||||
pHandle->currIdx = 0;
|
pHandle->currIdx = 0;
|
||||||
pHandle->pMeta = pMeta;
|
pHandle->pMeta = pMeta;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
streamSnapHandleDestroy(pHandle);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
|
@ -348,9 +433,10 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) {
|
int32_t code = streamSnapHandleInit(&pReader->handle, (char*)path, pMeta);
|
||||||
|
if (code != 0) {
|
||||||
taosMemoryFree(pReader);
|
taosMemoryFree(pReader);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppReader = pReader;
|
*ppReader = pReader;
|
||||||
|
@ -410,10 +496,10 @@ _NEXT:
|
||||||
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
|
int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
|
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
|
||||||
item->type, tstrerror(code));
|
item->type, tstrerror(code));
|
||||||
return -1;
|
return code;
|
||||||
} else if (nread > 0 && nread <= kBlockSize) {
|
} else if (nread > 0 && nread <= kBlockSize) {
|
||||||
// left bytes less than kBlockSize
|
// left bytes less than kBlockSize
|
||||||
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
|
||||||
|
@ -473,6 +559,7 @@ _NEXT:
|
||||||
// SMetaSnapWriter ========================================
|
// SMetaSnapWriter ========================================
|
||||||
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) {
|
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapWriter** ppWriter) {
|
||||||
// impl later
|
// impl later
|
||||||
|
int32_t code = 0;
|
||||||
SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter));
|
SStreamSnapWriter* pWriter = taosMemoryCalloc(1, sizeof(SStreamSnapWriter));
|
||||||
if (pWriter == NULL) {
|
if (pWriter == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -480,11 +567,27 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
|
||||||
|
|
||||||
SStreamSnapHandle* pHandle = &pWriter->handle;
|
SStreamSnapHandle* pHandle = &pWriter->handle;
|
||||||
pHandle->currIdx = 0;
|
pHandle->currIdx = 0;
|
||||||
|
|
||||||
pHandle->metaPath = taosStrdup(path);
|
pHandle->metaPath = taosStrdup(path);
|
||||||
|
if (pHandle->metaPath == NULL) {
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
|
||||||
|
if (pHandle->pDbSnapSet == NULL) {
|
||||||
|
streamSnapWriterClose(pWriter, 0);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SBackendSnapFile2 snapFile = {0};
|
SBackendSnapFile2 snapFile = {0};
|
||||||
taosArrayPush(pHandle->pDbSnapSet, &snapFile);
|
if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) {
|
||||||
|
streamSnapWriterClose(pWriter, 0);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -506,7 +609,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
|
||||||
if (pSnapFile->fd == 0) {
|
if (pSnapFile->fd == 0) {
|
||||||
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pSnapFile->fd == NULL) {
|
if (pSnapFile->fd == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP,
|
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP,
|
||||||
pHdr->name, tstrerror(code));
|
pHdr->name, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
@ -514,7 +617,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
|
||||||
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
|
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
|
||||||
int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
|
int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
|
||||||
if (bytes != pHdr->size) {
|
if (bytes != pHdr->size) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
|
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
|
@ -535,12 +638,16 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
|
||||||
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
|
SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
|
||||||
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
if (pSnapFile->fd == NULL) {
|
if (pSnapFile->fd == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP,
|
stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP,
|
||||||
pHdr->name, tstrerror(code));
|
pHdr->name, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
|
if (taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset) != pHdr->size) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
stInfo("succ to write data %s", pItem->name);
|
stInfo("succ to write data %s", pItem->name);
|
||||||
pSnapFile->offset += pHdr->size;
|
pSnapFile->offset += pHdr->size;
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,7 +383,7 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
streamStateDestroyBatch(pBatch);
|
streamStateDestroyBatch(pBatch);
|
||||||
}
|
}
|
||||||
// do checkpoint 2
|
// do checkpoint 2
|
||||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
|
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0);
|
||||||
{
|
{
|
||||||
void *pBatch = streamStateCreateBatch();
|
void *pBatch = streamStateCreateBatch();
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
|
@ -400,7 +400,7 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
streamStateDestroyBatch(pBatch);
|
streamStateDestroyBatch(pBatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3);
|
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0);
|
||||||
|
|
||||||
const char *path = "/tmp/backend/stream";
|
const char *path = "/tmp/backend/stream";
|
||||||
const char *dump = "/tmp/backend/stream/dump";
|
const char *dump = "/tmp/backend/stream/dump";
|
||||||
|
@ -410,7 +410,7 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
SArray *result = taosArrayInit(4, sizeof(void *));
|
SArray *result = taosArrayInit(4, sizeof(void *));
|
||||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
||||||
|
|
||||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4);
|
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0);
|
||||||
|
|
||||||
taosArrayClear(result);
|
taosArrayClear(result);
|
||||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);
|
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);
|
||||||
|
|
|
@ -98,6 +98,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log")
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
|
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
|
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
|
||||||
|
|
Loading…
Reference in New Issue