add self check info

This commit is contained in:
Yihao Deng 2024-06-24 12:26:10 +00:00
parent 64b79b30f4
commit 7cab27110a
4 changed files with 197 additions and 68 deletions

View File

@ -136,7 +136,7 @@ void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg); void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId); int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId, int64_t processver);
SListNode* streamBackendAddCompare(void* backend, void* arg); SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
@ -144,7 +144,6 @@ int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId); STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId);
void taskDbDestroy(void* pBackend, bool flush); void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend); void taskDbDestroy2(void* pBackend);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
@ -249,7 +248,7 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap); int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId);
SBkdMgt* bkdMgtCreate(char* path); SBkdMgt* bkdMgtCreate(char* path);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);

View File

@ -140,7 +140,7 @@ int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest);
int32_t valueToString(void* k, char* buf); int32_t valueToString(void* k, char* buf);
int32_t valueIsStale(void* k, int64_t ts); int32_t valueIsStale(void* k, int64_t ts);
void destroyCompare(void* arg); void destroyCompare(void* arg);
static void cleanDir(const char* pPath, const char* id); static void cleanDir(const char* pPath, const char* id);
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
@ -194,9 +194,7 @@ int32_t getCfIdx(const char* cfName) {
return idx; return idx;
} }
bool isValidCheckpoint(const char* dir) { bool isValidCheckpoint(const char* dir) { return true; }
return true;
}
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later // impl later
@ -486,9 +484,7 @@ _ERROR:
return code; return code;
} }
int32_t backendCopyFiles(const char* src, const char* dst) { int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); }
return backendFileCopyFilesImpl(src, dst);
}
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId, static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
const char* defaultPath) { const char* defaultPath) {
@ -540,7 +536,8 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
char* chkptPath = taosMemoryCalloc(1, pathLen); char* chkptPath = taosMemoryCalloc(1, pathLen);
if (chkptId > 0) { if (chkptId > 0) {
snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId); snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath); code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath);
if (code != 0) { if (code != 0) {
@ -549,11 +546,12 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
if (code != 0) { if (code != 0) {
stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath, stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath,
tstrerror(code), defaultPath); tstrerror(code), defaultPath);
code = 0; // reset the error code code = 0; // reset the error code
} }
} else { // no valid checkpoint id } else { // no valid checkpoint id
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key, defaultPath); stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data, clean defaultPath:%s", key,
defaultPath);
cleanDir(defaultPath, key); cleanDir(defaultPath, key);
} }
@ -1142,7 +1140,7 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
int64_t chkpId = pTaskDb->chkpId; int64_t chkpId = pTaskDb->chkpId;
taskDbRefChkp(pTaskDb, chkpId); taskDbRefChkp(pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, chkpId); code = taskDbDoCheckpoint(pTaskDb, chkpId, 0);
if (code != 0) { if (code != 0) {
taskDbUnRefChkp(pTaskDb, chkpId); taskDbUnRefChkp(pTaskDb, chkpId);
} }
@ -1230,7 +1228,106 @@ int64_t taskGetDBRef(void* arg) {
return pDb->refId; return pDb->refId;
} }
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) {
TdFilePtr pFile = NULL;
int32_t code = -1;
int32_t len = strlen(pChkpIdDir);
if (len == 0) {
terrno = TSDB_CODE_INVALID_PARA;
stError("failed to load extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno));
return -1;
}
char* pDst = taosMemoryCalloc(1, len + 64);
if (pDst == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) <= 0) {
code = -1;
stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
pFile = taosOpenFile(pDst, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to open file to load extra info, file:%s", pDst);
goto _EXIT;
}
char buf[256] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
code = -1;
goto _EXIT;
}
if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) {
terrno = TSDB_CODE_INVALID_PARA;
stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
}
code = 0;
_EXIT:
taosMemoryFree(pDst);
taosCloseFile(&pFile);
return code;
}
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
TdFilePtr pFile = NULL;
int32_t code = -1;
int32_t len = strlen(pChkpIdDir);
if (len == 0) {
terrno = TSDB_CODE_INVALID_PARA;
stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno));
return -1;
}
char* pDst = taosMemoryCalloc(1, len + 64);
if (pDst == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) < 0) {
stError("failed to build dst to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
pFile = taosOpenFile(pDst, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to open file to add extra info, file:%s", pDst);
goto _EXIT;
}
char buf[256] = {0};
int n = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId);
if (n <= 0 || n >= sizeof(buf)) {
code = -1;
stError("failed to build content to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
if (taosWriteFile(pFile, buf, strlen(buf)) <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
goto _EXIT;
}
code = 0;
_EXIT:
taosCloseFile(&pFile);
taosMemoryFree(pDst);
return code;
}
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId, int64_t processId) {
STaskDbWrapper* pTaskDb = arg; STaskDbWrapper* pTaskDb = arg;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t code = -1; int32_t code = -1;
@ -1254,32 +1351,58 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
int64_t written = atomic_load_64(&pTaskDb->dataWritten); int64_t written = atomic_load_64(&pTaskDb->dataWritten);
// flush db
if (written > 0) { if (written > 0) {
stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf); code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
if (code != 0) goto _EXIT;
} else { } else {
stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
} }
// do checkpoint
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
goto _EXIT;
} else { } else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
taosGetTimestampMs() - st); taosGetTimestampMs() - st);
} }
// add extra info to checkpoint
if ((code = chkpAddExtraInfo(pChkpIdDir, chkpId, processId)) != 0) {
stError("stream backend:%p failed to add extra info to checkpoint at:%s", pTaskDb, pChkpIdDir);
goto _EXIT;
}
// delete ttl checkpoint
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
if (code < 0) {
goto _EXIT;
}
atomic_store_64(&pTaskDb->dataWritten, 0); atomic_store_64(&pTaskDb->dataWritten, 0);
pTaskDb->chkpId = chkpId; pTaskDb->chkpId = chkpId;
_EXIT: _EXIT:
taosMemoryFree(pChkpDir);
// clear checkpoint dir if failed
if (code != 0 && pChkpDir != NULL) {
if (taosDirExist(pChkpIdDir)) {
taosRemoveDir(pChkpIdDir);
}
}
taosMemoryFree(pChkpIdDir); taosMemoryFree(pChkpIdDir);
taosMemoryFree(pChkpDir);
taosReleaseRef(taskDbWrapperId, refId); taosReleaseRef(taskDbWrapperId, refId);
taosMemoryFree(ppCf); taosMemoryFree(ppCf);
return code; return code;
} }
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); } int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId, int64_t processVer) {
return taskDbDoCheckpoint(arg, chkpId, processVer);
}
SListNode* streamBackendAddCompare(void* backend, void* arg) { SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)backend; SBackendWrapper* pHandle = (SBackendWrapper*)backend;
@ -2205,7 +2328,8 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code; return code;
} }
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) { int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list,
const char* idStr) {
int32_t code = 0; int32_t code = 0;
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
@ -2224,7 +2348,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code; return code;
} }
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) { int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list,
const char* idStr) {
int32_t code = -1; int32_t code = -1;
STaskDbWrapper* pDb = arg; STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type; ECHECKPOINT_BACKUP_TYPE utype = type;

View File

@ -24,11 +24,13 @@ static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
static int32_t deleteCheckpoint(const char* id); static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId); static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId); static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) { int32_t transId) {
@ -96,7 +98,7 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
if (pRsp->rspCode != TSDB_CODE_SUCCESS) { if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr,
pRsp->upstreamTaskId, tstrerror(pRsp->rspCode)); pRsp->upstreamTaskId, tstrerror(pRsp->rspCode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -108,7 +110,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
SRpcHandleInfo* pRpcInfo, int32_t code) { SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size); void* pBuf = rpcMallocCont(size);
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
@ -162,15 +164,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId > checkpointId) { if (pTask->chkInfo.checkpointId > checkpointId) {
stError("s-task:%s vgId:%d current checkpointId:%" PRId64 stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pTask->chkInfo.checkpointId == checkpointId) { if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream { // send checkpoint-ready msg to upstream
SRpcMsg msg ={0}; SRpcMsg msg = {0};
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId); SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId);
initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
@ -362,7 +364,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
taosThreadMutexUnlock(&pInfo->lock); taosThreadMutexUnlock(&pInfo->lock);
if (notReady == 0) { if (notReady == 0) {
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", id); stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task",
id);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId);
} }
@ -371,11 +374,11 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int32_t numOfConfirmed = 0; int32_t numOfConfirmed = 0;
taosThreadMutexLock(&pInfo->lock); taosThreadMutexLock(&pInfo->lock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
pReadyInfo->sendCompleted = 1; pReadyInfo->sendCompleted = 1;
@ -385,7 +388,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
} }
} }
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo->sendCompleted == 1) { if (pReadyInfo->sendCompleted == 1) {
numOfConfirmed += 1; numOfConfirmed += 1;
@ -568,12 +571,12 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
} }
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
char* path = NULL; char* path = NULL;
int32_t code = 0; int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
const char* idStr = pTask->id.idStr; const char* idStr = pTask->id.idStr;
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
pTask->id.idStr)) != 0) { pTask->id.idStr)) != 0) {
@ -619,8 +622,8 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
idStr, checkpointId, el, path); idStr, checkpointId, el, path);
taosRemoveDir(path); taosRemoveDir(path);
} else { } else {
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
idStr, checkpointId, el); checkpointId, el);
} }
taosMemoryFree(path); taosMemoryFree(path);
@ -639,9 +642,10 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
} }
int64_t dbRefId = taskGetDBRef(pTask->pBackend); int64_t dbRefId = taskGetDBRef(pTask->pBackend);
void* pBackend = taskAcquireDb(dbRefId); void* pBackend = taskAcquireDb(dbRefId);
if (pBackend == NULL) { if (pBackend == NULL) {
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr); stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
pTask->id.idStr);
return -1; return -1;
} }
@ -663,7 +667,8 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK) { if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId); int64_t ver = 0;
code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
} }
@ -773,11 +778,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
bool recved = false; bool recved = false;
for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) { for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j); STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
if (pInfo->nodeId == pReady->upstreamNodeId) { if (pInfo->nodeId == pReady->upstreamNodeId) {
recved = true; recved = true;
@ -785,7 +790,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
} }
} }
if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
streamTaskOpenUpstreamInput(pTask, pInfo->taskId); streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
taosArrayPush(pNotSendList, pInfo); taosArrayPush(pNotSendList, pInfo);
} }
@ -870,7 +875,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
return false; return false;
} }
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (pSendInfo->nodeId != downstreamNodeId) { if (pSendInfo->nodeId != downstreamNodeId) {
continue; continue;
@ -939,10 +944,10 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
int32_t num = 0; int32_t num = 0;
taosThreadMutexLock(&pInfo->lock); taosThreadMutexLock(&pInfo->lock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p->recved) { if (p->recved) {
num ++; num++;
} }
} }
taosThreadMutexUnlock(&pInfo->lock); taosThreadMutexUnlock(&pInfo->lock);

View File

@ -29,7 +29,7 @@ class BackendEnv : public ::testing::Test {
void *backendCreate() { void *backendCreate() {
const char *streamPath = "/tmp"; const char *streamPath = "/tmp";
void * p = NULL; void *p = NULL;
// char *absPath = NULL; // char *absPath = NULL;
// // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2);
@ -52,7 +52,7 @@ SStreamState *stateCreate(const char *path) {
} }
void *backendOpen() { void *backendOpen() {
streamMetaInit(); streamMetaInit();
const char * path = "/tmp/backend"; const char *path = "/tmp/backend";
SStreamState *p = stateCreate(path); SStreamState *p = stateCreate(path);
ASSERT(p != NULL); ASSERT(p != NULL);
@ -79,7 +79,7 @@ void *backendOpen() {
const char *val = "value data"; const char *val = "value data";
int32_t len = 0; int32_t len = 0;
char * newVal = NULL; char *newVal = NULL;
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(len == strlen(val)); ASSERT(len == strlen(val));
} }
@ -100,7 +100,7 @@ void *backendOpen() {
const char *val = "value data"; const char *val = "value data";
int32_t len = 0; int32_t len = 0;
char * newVal = NULL; char *newVal = NULL;
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(code != 0); ASSERT(code != 0);
} }
@ -130,7 +130,7 @@ void *backendOpen() {
winkey.groupId = 0; winkey.groupId = 0;
winkey.ts = tsArray[0]; winkey.ts = tsArray[0];
char * val = NULL; char *val = NULL;
int32_t len = 0; int32_t len = 0;
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
@ -157,7 +157,7 @@ void *backendOpen() {
key.ts = tsArray[i]; key.ts = tsArray[i];
key.exprIdx = i; key.exprIdx = i;
char * val = NULL; char *val = NULL;
int32_t len = 0; int32_t len = 0;
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
ASSERT(len == strlen("Value")); ASSERT(len == strlen("Value"));
@ -168,7 +168,7 @@ void *backendOpen() {
key.ts = tsArray[i]; key.ts = tsArray[i];
key.exprIdx = i; key.exprIdx = i;
char * val = NULL; char *val = NULL;
int32_t len = 0; int32_t len = 0;
streamStateFuncDel_rocksdb(p, &key); streamStateFuncDel_rocksdb(p, &key);
} }
@ -213,7 +213,7 @@ void *backendOpen() {
{ {
SSessionKey key; SSessionKey key;
memset(&key, 0, sizeof(key)); memset(&key, 0, sizeof(key));
char * val = NULL; char *val = NULL;
int32_t vlen = 0; int32_t vlen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0); ASSERT(code == 0);
@ -260,7 +260,7 @@ void *backendOpen() {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i); key.groupId = (uint64_t)(i);
key.ts = tsArray[i]; key.ts = tsArray[i];
char * val = NULL; char *val = NULL;
int32_t vlen = 0; int32_t vlen = 0;
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
taosMemoryFreeClear(val); taosMemoryFreeClear(val);
@ -272,7 +272,7 @@ void *backendOpen() {
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
ASSERT(pCurr != NULL); ASSERT(pCurr != NULL);
char * val = NULL; char *val = NULL;
int32_t vlen = 0; int32_t vlen = 0;
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value")); ASSERT(vlen == strlen("Value"));
@ -296,7 +296,7 @@ void *backendOpen() {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i); key.groupId = (uint64_t)(i);
key.ts = tsArray[i]; key.ts = tsArray[i];
char * val = NULL; char *val = NULL;
int32_t vlen = 0; int32_t vlen = 0;
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
taosMemoryFreeClear(val); taosMemoryFreeClear(val);
@ -338,7 +338,7 @@ void *backendOpen() {
char key[128] = {0}; char key[128] = {0};
sprintf(key, "tbname_%d", i); sprintf(key, "tbname_%d", i);
char * val = NULL; char *val = NULL;
int32_t len = 0; int32_t len = 0;
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
ASSERT(code == 0); ASSERT(code == 0);
@ -354,7 +354,7 @@ TEST_F(BackendEnv, checkOpen) {
SStreamState *p = (SStreamState *)backendOpen(); SStreamState *p = (SStreamState *)backendOpen();
int64_t tsStart = taosGetTimestampMs(); int64_t tsStart = taosGetTimestampMs();
{ {
void * pBatch = streamStateCreateBatch(); void *pBatch = streamStateCreateBatch();
int32_t size = 0; int32_t size = 0;
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
char key[128] = {0}; char key[128] = {0};
@ -368,7 +368,7 @@ TEST_F(BackendEnv, checkOpen) {
streamStateDestroyBatch(pBatch); streamStateDestroyBatch(pBatch);
} }
{ {
void * pBatch = streamStateCreateBatch(); void *pBatch = streamStateCreateBatch();
int32_t size = 0; int32_t size = 0;
char valBuf[256] = {0}; char valBuf[256] = {0};
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
@ -383,9 +383,9 @@ TEST_F(BackendEnv, checkOpen) {
streamStateDestroyBatch(pBatch); streamStateDestroyBatch(pBatch);
} }
// do checkpoint 2 // do checkpoint 2
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0);
{ {
void * pBatch = streamStateCreateBatch(); void *pBatch = streamStateCreateBatch();
int32_t size = 0; int32_t size = 0;
char valBuf[256] = {0}; char valBuf[256] = {0};
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
@ -400,17 +400,17 @@ TEST_F(BackendEnv, checkOpen) {
streamStateDestroyBatch(pBatch); streamStateDestroyBatch(pBatch);
} }
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3); taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0);
const char *path = "/tmp/backend/stream"; const char *path = "/tmp/backend/stream";
const char *dump = "/tmp/backend/stream/dump"; const char *dump = "/tmp/backend/stream/dump";
// taosMkDir(dump); // taosMkDir(dump);
taosMulMkDir(dump); taosMulMkDir(dump);
SBkdMgt *mgt = bkdMgtCreate((char *)path); SBkdMgt *mgt = bkdMgtCreate((char *)path);
SArray * result = taosArrayInit(4, sizeof(void *)); SArray *result = taosArrayInit(4, sizeof(void *));
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4); taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0);
taosArrayClear(result); taosArrayClear(result);
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);