Merge pull request #15920 from taosdata/feature/stream
enh(stream): recover process
This commit is contained in:
commit
a62950b441
|
@ -154,7 +154,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
|
||||||
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
|
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
|
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
|
||||||
|
//
|
||||||
|
return queue->qItem;
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
|
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
|
||||||
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
||||||
|
@ -226,9 +229,7 @@ typedef struct {
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int32_t childId;
|
int32_t childId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
// int64_t checkpointVer;
|
SEpSet epSet;
|
||||||
// int64_t processedVer;
|
|
||||||
SEpSet epSet;
|
|
||||||
} SStreamChildEpInfo;
|
} SStreamChildEpInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -372,15 +373,6 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t reserved;
|
|
||||||
} SStreamTaskDeployRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
// SMsgHead head;
|
|
||||||
SStreamTask* task;
|
|
||||||
} SStreamTaskDeployReq;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -478,7 +470,18 @@ typedef struct {
|
||||||
} SStreamRecoverDownstreamRsp;
|
} SStreamRecoverDownstreamRsp;
|
||||||
|
|
||||||
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
|
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
|
||||||
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp);
|
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
|
||||||
|
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t taskId;
|
||||||
|
int32_t waitingRspCnt;
|
||||||
|
int32_t totReq;
|
||||||
|
SArray* info; // SArray<SArray<SStreamCheckpointInfo>*>
|
||||||
|
} SStreamRecoverStatus;
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
|
@ -504,7 +507,7 @@ typedef struct SStreamMeta {
|
||||||
TTB* pTaskDb;
|
TTB* pTaskDb;
|
||||||
TTB* pStateDb;
|
TTB* pStateDb;
|
||||||
SHashObj* pTasks;
|
SHashObj* pTasks;
|
||||||
SHashObj* pRecoveringState;
|
SHashObj* pRecoverStatus;
|
||||||
void* ahandle;
|
void* ahandle;
|
||||||
TXN txn;
|
TXN txn;
|
||||||
FTaskExpand* expandFunc;
|
FTaskExpand* expandFunc;
|
||||||
|
|
|
@ -33,7 +33,7 @@ typedef struct {
|
||||||
static SStreamGlobalEnv streamEnv;
|
static SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
int32_t streamExec(SStreamTask* pTask);
|
int32_t streamExec(SStreamTask* pTask);
|
||||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum);
|
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
||||||
|
|
||||||
int32_t streamDispatch(SStreamTask* pTask);
|
int32_t streamDispatch(SStreamTask* pTask);
|
||||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
int8_t status;
|
int8_t status;
|
||||||
|
|
||||||
|
@ -136,7 +136,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
||||||
pRsp->pCont = buf;
|
pRsp->pCont = buf;
|
||||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
tFreeStreamDispatchReq(pReq);
|
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,6 +182,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
pReq->upstreamTaskId);
|
pReq->upstreamTaskId);
|
||||||
|
|
||||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||||
|
tFreeStreamDispatchReq(pReq);
|
||||||
|
|
||||||
if (exec) {
|
if (exec) {
|
||||||
streamTryExec(pTask);
|
streamTryExec(pTask);
|
||||||
|
@ -246,24 +246,20 @@ int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
|
int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
streamProcessRunReq(pTask);
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
|
||||||
|
|
||||||
streamProcessRunReq(pTask);
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
// scan data to recover
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
|
||||||
// scan data to recover
|
pTask->taskStatus = TASK_STATUS__RECOVER_SELF;
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
|
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
|
||||||
pTask->taskStatus = TASK_STATUS__RECOVERING;
|
if (streamPipelineExec(pTask, 100, true) < 0) {
|
||||||
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
|
return -1;
|
||||||
if (streamPipelineExec(pTask, 100) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
|
||||||
pTask->taskStatus = TASK_STATUS__NORMAL;
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
pTask->taskStatus = TASK_STATUS__NORMAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -93,7 +93,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
|
||||||
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
|
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
|
||||||
|
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
|
@ -125,24 +125,26 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
if (dispatch) {
|
||||||
if (qRes == NULL) {
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
if (qRes == NULL) {
|
||||||
return -1;
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
}
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
qRes->childId = pTask->selfChildId;
|
qRes->childId = pTask->selfChildId;
|
||||||
|
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
streamDispatch(pTask);
|
streamDispatch(pTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -132,6 +132,49 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) {
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq) {
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp) {
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
|
||||||
|
int32_t sz = taosArrayGetSize(pRsp->checkpointVer);
|
||||||
|
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SStreamCheckpointInfo* pInfo = taosArrayGet(pRsp->checkpointVer, i);
|
||||||
|
if (tEncodeSStreamCheckpointInfo(pEncoder, pInfo) < 0) return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp) {
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
|
||||||
|
int32_t sz;
|
||||||
|
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||||
|
pRsp->checkpointVer = taosArrayInit(sz, sizeof(SStreamCheckpointInfo));
|
||||||
|
if (pRsp->checkpointVer == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SStreamCheckpointInfo info;
|
||||||
|
if (tDecodeSStreamCheckpointInfo(pDecoder, &info) < 0) return -1;
|
||||||
|
taosArrayPush(pRsp->checkpointVer, &info);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
|
|
||||||
|
@ -223,25 +266,129 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamFetchDownstreamStatus(SStreamTask* pTask) {
|
int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) {
|
||||||
|
int32_t taskId = pVgInfo->taskId;
|
||||||
|
int32_t nodeId = pVgInfo->vgId;
|
||||||
|
SStreamRecoverDownstreamReq req = {
|
||||||
|
.streamId = pTask->taskId,
|
||||||
|
.downstreamTaskId = taskId,
|
||||||
|
.taskId = pTask->taskId,
|
||||||
|
};
|
||||||
|
int32_t tlen;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSStreamTaskRecoverReq, &req, tlen, code);
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
|
if (tEncodeSStreamTaskRecoverReq(&encoder, &req) < 0) {
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(nodeId);
|
||||||
|
SRpcMsg msg = {
|
||||||
|
.pCont = buf, .contLen = sizeof(SMsgHead) + tlen,
|
||||||
|
/*.msgType = */
|
||||||
|
};
|
||||||
|
tmsgSendReq(&pVgInfo->epSet, &msg);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
// set self status to recover_phase1
|
// set self status to recover_phase1
|
||||||
// build fetch status msg
|
SStreamRecoverStatus* pRecover;
|
||||||
// send fetch msg
|
|
||||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
|
||||||
|
pRecover = taosHashGet(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t));
|
||||||
|
if (pRecover == NULL) {
|
||||||
|
pRecover = taosMemoryCalloc(1, sizeof(SStreamRecoverStatus));
|
||||||
|
if (pRecover == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pRecover->info = taosArrayInit(0, sizeof(void*));
|
||||||
|
if (pRecover->info == NULL) {
|
||||||
|
taosMemoryFree(pRecover);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosHashPut(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t), &pRecover, sizeof(void*));
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
pRecover->totReq = 1;
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
int32_t numOfDownstream = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
pRecover->totReq = numOfDownstream;
|
||||||
|
for (int32_t i = 0; i < numOfDownstream; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(pTask->shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||||
|
streamFetchRecoverStatus(pTask, pVgInfo);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) {
|
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
|
||||||
// if failed, set timer and retry
|
// if failed, set timer and retry
|
||||||
// if successful
|
// if successful
|
||||||
// add rsp state to partial recover hash
|
int32_t taskId = pTask->taskId;
|
||||||
// if complete, begin actual recover
|
SStreamRecoverStatus* pRecover = taosHashGet(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
|
||||||
|
if (pRecover == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pRecover->info, &pRsp->checkpointVer);
|
||||||
|
|
||||||
|
int32_t leftRsp = atomic_sub_fetch_32(&pRecover->waitingRspCnt, 1);
|
||||||
|
ASSERT(leftRsp >= 0);
|
||||||
|
|
||||||
|
if (leftRsp == 0) {
|
||||||
|
ASSERT(taosArrayGetSize(pRecover->info) == pRecover->totReq);
|
||||||
|
|
||||||
|
// srcNodeId -> SStreamCheckpointInfo*
|
||||||
|
SHashObj* pFinalChecks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||||
|
if (pFinalChecks == NULL) return -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pRecover->totReq; i++) {
|
||||||
|
SArray* pChecks = taosArrayGetP(pRecover->info, i);
|
||||||
|
int32_t sz = taosArrayGetSize(pChecks);
|
||||||
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
|
SStreamCheckpointInfo* pOneCheck = taosArrayGet(pChecks, j);
|
||||||
|
SStreamCheckpointInfo* pCheck = taosHashGet(pFinalChecks, &pOneCheck->srcNodeId, sizeof(int32_t));
|
||||||
|
if (pCheck == NULL) {
|
||||||
|
pCheck = taosMemoryCalloc(1, sizeof(SStreamCheckpointInfo));
|
||||||
|
pCheck->srcNodeId = pOneCheck->srcNodeId;
|
||||||
|
pCheck->srcChildId = pOneCheck->srcChildId;
|
||||||
|
pCheck->stateProcessedVer = pOneCheck->stateProcessedVer;
|
||||||
|
taosHashPut(pFinalChecks, &pCheck->srcNodeId, sizeof(int32_t), &pCheck, sizeof(void*));
|
||||||
|
} else {
|
||||||
|
pCheck->stateProcessedVer = TMIN(pCheck->stateProcessedVer, pOneCheck->stateProcessedVer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// load local state
|
||||||
|
//
|
||||||
|
// recover
|
||||||
|
//
|
||||||
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
|
||||||
|
if (streamPipelineExec(pTask, 10000, true) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosHashCleanup(pFinalChecks);
|
||||||
|
taosHashRemove(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
|
||||||
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue