refactor(stream): recover and fill history
This commit is contained in:
parent
52b4b510a8
commit
8d8fd2b2bc
|
@ -53,8 +53,7 @@ typedef struct {
|
|||
|
||||
void* sContext; // SSnapContext*
|
||||
|
||||
void* pStateBackend;
|
||||
int64_t fillHistoryVer1;
|
||||
void* pStateBackend;
|
||||
} SReadHandle;
|
||||
|
||||
// in queue mode, data streams are seperated by msg
|
||||
|
|
|
@ -445,6 +445,15 @@ typedef struct {
|
|||
int32_t taskId;
|
||||
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t childId;
|
||||
} SStreamRecoverFinishReq;
|
||||
|
||||
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
|
||||
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
|
||||
|
||||
#if 0
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
|
|
|
@ -4853,6 +4853,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
if (tEncodeCStr(&encoder, pReq->sourceDB) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->fillHistory) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
|
||||
|
@ -4889,6 +4890,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
if (tDecodeCStrTo(&decoder, pReq->sourceDB) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->fillHistory) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1;
|
||||
|
|
|
@ -905,7 +905,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
.vnode = pTq->pVnode,
|
||||
.initTqReader = 1,
|
||||
.pStateBackend = pTask->pState,
|
||||
.fillHistoryVer1 = pTask->fillHistory ? ver : -1,
|
||||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
|
@ -1090,9 +1089,24 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
int32_t code;
|
||||
|
||||
// deserialize
|
||||
int32_t len;
|
||||
SStreamRecoverFinishReq req;
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, sizeof(SStreamRecoverFinishReq));
|
||||
tDecodeSStreamRecoverFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
// find task
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// do process request
|
||||
//
|
||||
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -988,8 +988,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
|
||||
destroyBlockDistScanOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
@ -2235,7 +2235,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->pGroupTags = pTableScanNode->pGroupTags;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code =
|
||||
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
|
||||
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
||||
SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
||||
|
@ -2330,7 +2331,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
} else {
|
||||
taosArrayDestroy(pColIds);
|
||||
}
|
||||
pTaskInfo->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;
|
||||
|
||||
// create the pseduo columns info
|
||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||
|
@ -2361,8 +2361,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -3930,8 +3929,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -4043,7 +4041,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
int32_t num = 0;
|
||||
int32_t numOfExprs = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||
int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code =
|
||||
extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4066,8 +4065,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -4563,7 +4561,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
|
||||
int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
||||
&pInfo->matchInfo);
|
||||
|
||||
code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -1832,7 +1832,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2638,7 +2639,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL);
|
||||
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
|
@ -2708,8 +2710,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
|
||||
destroyStateWindowOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2782,8 +2784,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||
destroySWindowOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4233,13 +4235,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
.minTs = INT64_MAX,
|
||||
};
|
||||
|
||||
if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) {
|
||||
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
|
||||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
pInfo->twAggSup.deleteMark = INT64_MAX;
|
||||
}
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
|
@ -4264,9 +4259,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
|
||||
NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL,
|
||||
destroyStreamSessionAggOperatorInfo, NULL);
|
||||
if (downstream) {
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
||||
pInfo->primaryTsIndex);
|
||||
|
@ -4411,9 +4405,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
|||
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||
pOperator->name = "StreamSessionSemiAggOperator";
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||
destroyStreamSessionAggOperatorInfo, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||
destroyStreamSessionAggOperatorInfo, NULL);
|
||||
}
|
||||
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
|
@ -4747,13 +4740,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.minTs = INT64_MAX,
|
||||
};
|
||||
|
||||
if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) {
|
||||
pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
|
||||
pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
pInfo->twAggSup.deleteMark = INT64_MAX;
|
||||
}
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
@ -4789,8 +4775,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
|
||||
destroyStreamStateOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, NULL);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
||||
pInfo->primaryTsIndex);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -5066,8 +5052,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = miaInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
|
||||
destroyMAIOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMAIOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5378,8 +5364,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pMergeIntervalInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||
destroyMergeIntervalOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5544,13 +5530,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
||||
if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) {
|
||||
pTaskInfo->streamInfo.triggerSaved = twAggSupp.calTrigger;
|
||||
pTaskInfo->streamInfo.deleteMarkSaved = twAggSupp.deleteMark;
|
||||
twAggSupp.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
twAggSupp.deleteMark = INT64_MAX;
|
||||
}
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->interval = interval;
|
||||
pInfo->twAggSup = twAggSupp;
|
||||
|
@ -5618,9 +5597,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL,
|
||||
destroyStreamFinalIntervalOperatorInfo, NULL);
|
||||
|
||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -210,6 +210,46 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||
SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
int32_t tlen;
|
||||
tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
msg.contLen = tlen + sizeof(SMsgHead);
|
||||
msg.pCont = buf;
|
||||
msg.msgType = TDMT_VND_STREAM_RECOVER_FINISH;
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
code = 0;
|
||||
return 0;
|
||||
FAIL:
|
||||
if (buf) rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
|
@ -244,9 +284,10 @@ int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq,
|
|||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
code = 0;
|
||||
FAIL:
|
||||
if (code < 0 && buf) rpcFreeCont(buf);
|
||||
return 0;
|
||||
FAIL:
|
||||
if (buf) rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||
|
|
|
@ -141,9 +141,10 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
tEncodeSStreamTask(&encoder, pTask);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -62,8 +62,12 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
|
||||
SStreamRecoverFinishReq req = {
|
||||
.streamId = pTask->streamId,
|
||||
.taskId = pTask->taskId,
|
||||
.childId = pTask->selfChildId,
|
||||
};
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
/*SStreamFillFinish*/
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
}
|
||||
return 0;
|
||||
|
@ -87,6 +91,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
|
|||
if (qStreamRecoverFinish(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
streamSetStatusNormal(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -99,6 +104,22 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
#if 0
|
||||
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -216,6 +237,7 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) {
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||
|
@ -258,6 +280,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
#if 0
|
||||
|
@ -353,6 +376,7 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) {
|
||||
int32_t taskId = pVgInfo->taskId;
|
||||
int32_t nodeId = pVgInfo->vgId;
|
||||
|
@ -423,6 +447,7 @@ int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
|
||||
|
|
Loading…
Reference in New Issue