refactor: do some internal refactor.
This commit is contained in:
parent
815a9e953b
commit
ee4e4c0e6d
|
@ -304,7 +304,7 @@ typedef struct SSTaskBasicInfo {
|
|||
int32_t totalLevel;
|
||||
int8_t taskLevel;
|
||||
int8_t fillHistory; // is fill history task or not
|
||||
int64_t triggerParam; // in msec
|
||||
int64_t delaySchedParam; // in msec
|
||||
} SSTaskBasicInfo;
|
||||
|
||||
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
|
||||
|
|
|
@ -46,18 +46,8 @@ typedef struct SVgroupChangeInfo {
|
|||
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
||||
} SVgroupChangeInfo;
|
||||
|
||||
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
|
||||
// to avoid too many checkpoints for a taskk in the waiting list
|
||||
typedef struct SCheckpointCandEntry {
|
||||
char *pName;
|
||||
int64_t streamId;
|
||||
int64_t checkpointTs;
|
||||
int64_t checkpointId;
|
||||
} SCheckpointCandEntry;
|
||||
|
||||
typedef struct SStreamTransMgmt {
|
||||
SHashObj *pDBTrans;
|
||||
SHashObj *pWaitingList; // stream id list, of which timed checkpoint failed to be issued due to the trans conflict.
|
||||
} SStreamTransMgmt;
|
||||
|
||||
typedef struct SStreamExecInfo {
|
||||
|
@ -98,7 +88,6 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|||
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
|
||||
int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt);
|
||||
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
|
||||
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
|
||||
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
|
||||
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq);
|
||||
|
||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq);
|
||||
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
|
@ -112,7 +112,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||
|
@ -141,7 +141,6 @@ void mndCleanupStream(SMnode *pMnode) {
|
|||
taosArrayDestroy(execInfo.pTaskList);
|
||||
taosHashCleanup(execInfo.pTaskMap);
|
||||
taosHashCleanup(execInfo.transMgmt.pDBTrans);
|
||||
taosHashCleanup(execInfo.transMgmt.pWaitingList);
|
||||
taosHashCleanup(execInfo.pTransferStateStreams);
|
||||
taosThreadMutexDestroy(&execInfo.lock);
|
||||
mDebug("mnd stream exec info cleanup");
|
||||
|
@ -967,7 +966,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
||||
if (conflict) {
|
||||
mndAddtoCheckpointWaitingList(pStream, checkpointId);
|
||||
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
|
||||
pStream->name, pStream->uid);
|
||||
return -1;
|
||||
|
@ -1131,7 +1129,7 @@ static int32_t streamWaitComparFn(const void* p1, const void* p2) {
|
|||
return pInt1->duration > pInt2->duration? -1:1;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
|
|
@ -151,28 +151,6 @@ int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) {
|
||||
SCheckpointCandEntry* pEntry = taosHashGet(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid));
|
||||
if (pEntry == NULL) {
|
||||
SCheckpointCandEntry entry = {.streamId = pStream->uid,
|
||||
.checkpointTs = taosGetTimestampMs(),
|
||||
.checkpointId = checkpointId,
|
||||
.pName = taosStrdup(pStream->name)};
|
||||
|
||||
taosHashPut(execInfo.transMgmt.pWaitingList, &pStream->uid, sizeof(pStream->uid), &entry, sizeof(entry));
|
||||
int32_t size = taosHashGetSize(execInfo.transMgmt.pWaitingList);
|
||||
|
||||
mDebug("stream:%" PRIx64 " add into waiting list due to conflict, ts:%" PRId64 " , checkpointId: %" PRId64
|
||||
", total in waitingList:%d",
|
||||
pStream->uid, entry.checkpointTs, checkpointId, size);
|
||||
} else {
|
||||
mDebug("stream:%" PRIx64 " ts:%" PRId64 ", checkpointId:%" PRId64 " already in waiting list, no need to add into",
|
||||
pStream->uid, pEntry->checkpointTs, checkpointId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
|
||||
if (pTrans == NULL) {
|
||||
|
|
|
@ -558,11 +558,6 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void freeCheckpointCandEntry(void *param) {
|
||||
SCheckpointCandEntry *pEntry = param;
|
||||
taosMemoryFreeClear(pEntry->pName);
|
||||
}
|
||||
|
||||
static void freeTaskList(void* param) {
|
||||
SArray** pList = (SArray **)param;
|
||||
taosArrayDestroy(*pList);
|
||||
|
@ -575,9 +570,7 @@ void mndInitExecInfo() {
|
|||
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
||||
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||
|
||||
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
||||
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
||||
}
|
||||
|
|
|
@ -62,14 +62,14 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
|||
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
|
||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
|
||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam);
|
||||
} else {
|
||||
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
" nextProcessVer:%" PRId64
|
||||
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
|
||||
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
|
||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask
|
|||
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask && *ppTask) {
|
||||
pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer;
|
||||
pItem->fetchResultVer = (*ppTask)->info.triggerParam;
|
||||
pItem->fetchResultVer = (*ppTask)->info.delaySchedParam;
|
||||
}
|
||||
streamMetaRUnLock(pMeta);
|
||||
}
|
||||
|
@ -1289,7 +1289,7 @@ _checkpoint:
|
|||
|
||||
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
|
||||
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
|
||||
pTask->info.triggerParam = pItem->fetchResultVer;
|
||||
pTask->info.delaySchedParam = pItem->fetchResultVer;
|
||||
pTask->info.taskLevel = TASK_LEVEL_SMA;
|
||||
|
||||
if (!checkpointBuilt) {
|
||||
|
|
|
@ -770,19 +770,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
" nextProcessVer:%" PRId64
|
||||
" child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
|
||||
"trigger:%" PRId64 " ms, inputVer:%" PRId64,
|
||||
"delaySched:%" PRId64 " ms, inputVer:%" PRId64,
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer);
|
||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
|
||||
} else {
|
||||
tqInfo(
|
||||
"vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
" nextProcessVer:%" PRId64
|
||||
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64
|
||||
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x delaySched:%" PRId64
|
||||
" ms, inputVer:%" PRId64,
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
|
||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
|
||||
|
||||
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer);
|
||||
}
|
||||
|
|
|
@ -145,9 +145,6 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
|||
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||
void clearBufferedDispatchMsg(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId,
|
||||
int32_t vgId, SEpSet* pEpset);
|
||||
|
||||
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
|
||||
|
|
|
@ -227,28 +227,6 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
|||
pMsgInfo->dispatchMsgType = 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId,
|
||||
int32_t vgId, SEpSet* pEpset) {
|
||||
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
||||
|
||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||
int32_t code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, dstTaskId, pData->type);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyDispatchMsg(pReq, 1);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return doSendDispatchMsg(pTask, pReq, vgId, pEpset);
|
||||
}
|
||||
|
||||
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||
|
|
|
@ -458,10 +458,10 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
SStreamTask* p = *(SStreamTask**)pIter;
|
||||
|
||||
// release the ref by timer
|
||||
if (p->info.triggerParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
|
||||
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
|
||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
|
||||
taosTmrStop(p->schedInfo.pDelayTimer);
|
||||
p->info.triggerParam = 0;
|
||||
p->info.delaySchedParam = 0;
|
||||
streamMetaReleaseTask(pMeta, p);
|
||||
}
|
||||
|
||||
|
@ -752,10 +752,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
|
||||
ASSERT(pTask->status.timerActive == 0);
|
||||
|
||||
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
|
||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
|
||||
taosTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
pTask->info.triggerParam = 0;
|
||||
pTask->info.delaySchedParam = 0;
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
|
|
|
@ -330,7 +330,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
}
|
||||
|
||||
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&
|
||||
(pTask->info.triggerParam != 0)) {
|
||||
(pTask->info.delaySchedParam != 0)) {
|
||||
atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
||||
stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status);
|
||||
}
|
||||
|
|
|
@ -20,13 +20,13 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
|
|||
static void streamTaskSchedHelper(void* param, void* tmrId);
|
||||
|
||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||
if (pTask->info.triggerParam != 0 && pTask->info.fillHistory == 0) {
|
||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL);
|
||||
|
||||
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
|
||||
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.delaySchedParam);
|
||||
|
||||
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
|
||||
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
|
||||
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
void streamTaskSchedHelper(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = (void*)param;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t nextTrigger = (int32_t)pTask->info.triggerParam;
|
||||
int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam;
|
||||
|
||||
int8_t status = atomic_load_8(&pTask->schedInfo.status);
|
||||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||
|
|
|
@ -107,7 +107,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
|||
|
||||
pTask->info.taskLevel = taskLevel;
|
||||
pTask->info.fillHistory = fillHistory;
|
||||
pTask->info.triggerParam = triggerParam;
|
||||
pTask->info.delaySchedParam = triggerParam;
|
||||
pTask->subtableWithoutMd5 = subtableWithoutMd5;
|
||||
|
||||
pTask->status.pSM = streamCreateStateMachine(pTask);
|
||||
|
|
|
@ -505,7 +505,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->info.delaySchedParam) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
|
||||
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
|
||||
|
||||
|
@ -588,7 +588,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->info.delaySchedParam) < 0) return -1;
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue