Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-08-21 11:49:08 +08:00
commit 99024e9d78
2 changed files with 6 additions and 4 deletions

View File

@ -1884,7 +1884,7 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_
// todo extract method: traverse stream tasks
// build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) {
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, "stream-task-update");
if (pTrans == NULL) {
mError("failed to build stream task DAG update, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
@ -2154,6 +2154,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
typedef struct SMStreamNodeCheckMsg {
int8_t holder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {

View File

@ -871,14 +871,14 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
int32_t code = -1;
SArray* refs = taosArrayInit(16, sizeof(int64_t));
SArray* pCf = taosArrayInit(16, POINTER_BYTES);
rocksdb_column_family_handle_t** ppCf = NULL;
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
goto _ERROR;
taosArrayDestroy(refs);
return code;
}
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
@ -886,6 +886,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
goto _ERROR;
}
// Get all cf and acquire cfWrappter
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
@ -901,7 +902,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
} else {
qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
// release all ref to cfWrapper;
for (int i = 0; i < taosArrayGetSize(refs); i++) {
int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id);