commit
11dfb0cc77
|
@ -338,7 +338,7 @@ typedef struct SStreamTask {
|
|||
int32_t recoverWaitingUpstream;
|
||||
int64_t checkReqId;
|
||||
SArray* checkReqIds; // shuffle
|
||||
|
||||
int32_t refCnt;
|
||||
} SStreamTask;
|
||||
|
||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||
|
@ -565,6 +565,7 @@ typedef struct SStreamMeta {
|
|||
TXN txn;
|
||||
FTaskExpand* expandFunc;
|
||||
int32_t vgId;
|
||||
SRWLatch lock;
|
||||
} SStreamMeta;
|
||||
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
|
||||
|
@ -575,6 +576,10 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, c
|
|||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId);
|
||||
|
||||
int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
int32_t streamMetaRollBack(SStreamMeta* pMeta);
|
||||
|
|
|
@ -738,6 +738,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
|
|
@ -36,13 +36,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
return;
|
||||
|
@ -63,6 +64,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
|||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
||||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
|
||||
pTask->refCnt = 1;
|
||||
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->inputQueue = streamQueueOpen();
|
||||
pTask->outputQueue = streamQueueOpen();
|
||||
|
@ -166,15 +168,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
|||
|
||||
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
|
||||
return streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
|
||||
streamMetaRemoveTask1(pSnode->pMeta, pReq->taskId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
SStreamTaskRunReq *pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessRunReq(pTask);
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -191,13 +195,14 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
|
|||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, exec);
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -215,13 +220,14 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
int32_t taskId = req.dstTaskId;
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
tDeleteStreamRetrieveReq(&req);
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -232,9 +238,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -274,15 +281,17 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
// find task
|
||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, req.taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// do process request
|
||||
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -882,6 +882,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
|
||||
}
|
||||
|
||||
pTask->refCnt = 1;
|
||||
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
|
||||
pTask->inputQueue = streamQueueOpen();
|
||||
|
@ -975,13 +976,15 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
.upstreamNodeId = req.upstreamNodeId,
|
||||
.upstreamTaskId = req.upstreamTaskId,
|
||||
};
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
|
||||
rsp.status = 1;
|
||||
} else {
|
||||
rsp.status = 0;
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
|
||||
tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
|
||||
|
@ -1027,12 +1030,14 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
|
|||
tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return streamProcessTaskCheckRsp(pTask, &rsp, version);
|
||||
code = streamProcessTaskCheckRsp(pTask, &rsp, version);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
|
@ -1077,15 +1082,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t msgLen = pMsg->contLen;
|
||||
|
||||
SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pReq->taskId == pTask->taskId);
|
||||
|
||||
// check param
|
||||
int64_t fillVer1 = pTask->startVer;
|
||||
if (fillVer1 <= 0) {
|
||||
ASSERT(0);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1096,10 +1103,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamRecoverStep2Req req;
|
||||
code = streamBuildSourceRecover2Req(pTask, &req);
|
||||
if (code < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pReq->taskId == pTask->taskId);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
|
||||
// serialize msg
|
||||
int32_t len = sizeof(SStreamRecoverStep1Req);
|
||||
|
@ -1127,7 +1135,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1135,27 +1143,33 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
// do recovery step 2
|
||||
code = streamSourceRecoverScanStep2(pTask, version);
|
||||
if (code < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// restore param
|
||||
code = streamRestoreParam(pTask);
|
||||
if (code < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// set status normal
|
||||
code = streamSetStatusNormal(pTask);
|
||||
if (code < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// dispatch recover finish req to all related downstream task
|
||||
code = streamDispatchRecoverFinishReq(pTask);
|
||||
if (code < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1172,15 +1186,17 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
// find task
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, req.taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// do process request
|
||||
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1354,9 +1370,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
streamProcessRunReq(pTask);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -1373,13 +1390,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
|||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, exec);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -1389,10 +1407,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
|||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
tqDebug("recv dispatch rsp, code: %x", pMsg->code);
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
|
@ -1401,7 +1420,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
||||
streamMetaRemoveTask1(pTq->pStreamMeta, pReq->taskId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
@ -1414,13 +1434,14 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
int32_t taskId = req.dstTaskId;
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
tDeleteStreamRetrieveReq(&req);
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -1452,13 +1473,14 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
|||
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
return 0;
|
||||
|
|
|
@ -2385,6 +2385,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
taosMemoryFree(pStreamScan->pPseudoExpr);
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
||||
|
||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||
blockDataDestroy(pStreamScan->pRes);
|
||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||
|
|
|
@ -169,6 +169,47 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
}
|
||||
}
|
||||
|
||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||
taosRLockLatch(&pMeta->lock);
|
||||
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
SStreamTask* pTask = *ppTask;
|
||||
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__DROPPING) {
|
||||
atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
return pTask;
|
||||
} else {
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||
ASSERT(left >= 0);
|
||||
if (left == 0) {
|
||||
ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING);
|
||||
tFreeSStreamTask(pTask);
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) {
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
SStreamTask* pTask = *ppTask;
|
||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
if (ppTask) {
|
||||
|
|
|
@ -225,6 +225,7 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
|
|||
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
|
||||
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
|
||||
|
||||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
|
|
|
@ -192,13 +192,34 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
SSyncRaftEntry* pAppendEntry = syncEntryBuildFromAppendEntries(pMsg);
|
||||
ASSERT(pAppendEntry != NULL);
|
||||
|
||||
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
|
||||
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
|
||||
|
||||
LRUHandle* hLocal = NULL;
|
||||
LRUHandle* hAppend = NULL;
|
||||
|
||||
int32_t code = 0;
|
||||
SSyncRaftEntry* pLocalEntry = NULL;
|
||||
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
|
||||
SLRUCache* pCache = ths->pLogStore->pCache;
|
||||
hLocal = taosLRUCacheLookup(pCache, &appendIndex, sizeof(appendIndex));
|
||||
if (hLocal) {
|
||||
pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal);
|
||||
code = 0;
|
||||
|
||||
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry);
|
||||
|
||||
} else {
|
||||
sNTrace(ths, "miss cache index:%" PRId64, appendIndex);
|
||||
|
||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
// get local entry success
|
||||
|
||||
if (pLocalEntry->term == pAppendEntry->term) {
|
||||
// do nothing
|
||||
sNTrace(ths, "log match, do nothing, index:%" PRId64, appendIndex);
|
||||
|
||||
} else {
|
||||
// truncate
|
||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
|
||||
|
@ -207,8 +228,18 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%" PRId64, appendIndex);
|
||||
syncLogRecvAppendEntries(ths, pMsg, logBuf);
|
||||
|
||||
syncEntryDestory(pLocalEntry);
|
||||
syncEntryDestory(pAppendEntry);
|
||||
if (hLocal) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
|
||||
} else {
|
||||
syncEntryDestory(pLocalEntry);
|
||||
}
|
||||
|
||||
if (hAppend) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
|
||||
} else {
|
||||
syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
goto _IGNORE;
|
||||
}
|
||||
|
||||
|
@ -219,10 +250,22 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
|
||||
syncLogRecvAppendEntries(ths, pMsg, logBuf);
|
||||
|
||||
syncEntryDestory(pLocalEntry);
|
||||
syncEntryDestory(pAppendEntry);
|
||||
if (hLocal) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
|
||||
} else {
|
||||
syncEntryDestory(pLocalEntry);
|
||||
}
|
||||
|
||||
if (hAppend) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
|
||||
} else {
|
||||
syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
goto _IGNORE;
|
||||
}
|
||||
|
||||
syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -248,20 +291,42 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);
|
||||
syncLogRecvAppendEntries(ths, pMsg, logBuf);
|
||||
|
||||
syncEntryDestory(pLocalEntry);
|
||||
syncEntryDestory(pAppendEntry);
|
||||
if (hLocal) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
|
||||
} else {
|
||||
syncEntryDestory(pLocalEntry);
|
||||
}
|
||||
|
||||
if (hAppend) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
|
||||
} else {
|
||||
syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
goto _IGNORE;
|
||||
}
|
||||
|
||||
syncCacheEntry(ths->pLogStore, pAppendEntry, &hAppend);
|
||||
|
||||
} else {
|
||||
// error
|
||||
// get local entry success
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64 " err:%d", appendIndex,
|
||||
terrno);
|
||||
syncLogRecvAppendEntries(ths, pMsg, logBuf);
|
||||
|
||||
syncEntryDestory(pLocalEntry);
|
||||
syncEntryDestory(pAppendEntry);
|
||||
if (hLocal) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
|
||||
} else {
|
||||
syncEntryDestory(pLocalEntry);
|
||||
}
|
||||
|
||||
if (hAppend) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
|
||||
} else {
|
||||
syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
goto _IGNORE;
|
||||
}
|
||||
}
|
||||
|
@ -269,8 +334,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
// update match index
|
||||
pReply->matchIndex = pAppendEntry->index;
|
||||
|
||||
syncEntryDestory(pLocalEntry);
|
||||
syncEntryDestory(pAppendEntry);
|
||||
if (hLocal) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hLocal, false);
|
||||
} else {
|
||||
syncEntryDestory(pLocalEntry);
|
||||
}
|
||||
|
||||
if (hAppend) {
|
||||
taosLRUCacheRelease(ths->pLogStore->pCache, hAppend, false);
|
||||
} else {
|
||||
syncEntryDestory(pAppendEntry);
|
||||
}
|
||||
|
||||
} else {
|
||||
// no append entries, do nothing
|
||||
|
|
|
@ -116,7 +116,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
|
||||
if (h) {
|
||||
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||
|
||||
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", index, pEntry->bytes, pEntry);
|
||||
|
||||
} else {
|
||||
sNTrace(pSyncNode, "miss cache index:%" PRId64, index);
|
||||
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
|
||||
if (code != 0) {
|
||||
sNError(pSyncNode, "advance commit index error, read wal index:%" PRId64, index);
|
||||
|
|
|
@ -383,15 +383,33 @@ bool syncIsReadyForRead(int64_t rid) {
|
|||
|
||||
} else {
|
||||
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
|
||||
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
|
||||
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
|
||||
LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
|
||||
int32_t code = 0;
|
||||
if (h) {
|
||||
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||
code = 0;
|
||||
|
||||
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
|
||||
|
||||
} else {
|
||||
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
|
||||
|
||||
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
|
||||
}
|
||||
|
||||
if (code == 0 && pEntry != NULL) {
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
|
||||
ready = true;
|
||||
}
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
if (h) {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
} else {
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1754,10 +1772,24 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SyncTerm preTerm = 0;
|
||||
SyncIndex preIndex = index - 1;
|
||||
SyncTerm preTerm = 0;
|
||||
SyncIndex preIndex = index - 1;
|
||||
|
||||
SSyncRaftEntry* pPreEntry = NULL;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
|
||||
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
|
||||
LRUHandle* h = taosLRUCacheLookup(pCache, &preIndex, sizeof(preIndex));
|
||||
int32_t code = 0;
|
||||
if (h) {
|
||||
pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||
code = 0;
|
||||
|
||||
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
|
||||
|
||||
} else {
|
||||
sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
|
||||
|
||||
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
|
||||
}
|
||||
|
||||
SSnapshot snapshot = {.data = NULL,
|
||||
.lastApplyIndex = SYNC_INDEX_INVALID,
|
||||
|
@ -1767,7 +1799,13 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
|||
if (code == 0) {
|
||||
ASSERT(pPreEntry != NULL);
|
||||
preTerm = pPreEntry->term;
|
||||
taosMemoryFree(pPreEntry);
|
||||
|
||||
if (h) {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
} else {
|
||||
syncEntryDestory(pPreEntry);
|
||||
}
|
||||
|
||||
return preTerm;
|
||||
} else {
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||
|
@ -1813,9 +1851,6 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pPingTimer);
|
||||
} else {
|
||||
sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64,
|
||||
pNode->pingTimerLogicClock, pNode->pingTimerLogicClockUser);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1842,16 +1877,6 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
|||
sError("failed to sync enqueue elect msg since %s", terrstr());
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
#if 0
|
||||
// reset timer ms
|
||||
if (syncIsInit() && pNode->electBaseLine > 0) {
|
||||
pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
|
||||
taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
|
||||
} else {
|
||||
sError("sync env is stop, syncNodeEqElectTimer");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||
|
@ -1965,7 +1990,10 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
|
|||
|
||||
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
|
||||
|
||||
static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
|
||||
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
|
||||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
sNTrace(pData->pSyncNode, "in cache index:%" PRId64 ", bytes:%u, %p", pEntry->index, pEntry->bytes, pEntry);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t entryLen = sizeof(*pEntry) + pEntry->dataLen;
|
||||
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
|
||||
|
@ -1986,7 +2014,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
ASSERT(pEntry != NULL);
|
||||
|
||||
LRUHandle* h = NULL;
|
||||
syncCacheEntry(ths->pLogStore, pEntry, &h);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||
|
@ -1994,6 +2021,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
sError("append noop error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
syncCacheEntry(ths->pLogStore, pEntry, &h);
|
||||
}
|
||||
|
||||
if (h) {
|
||||
|
@ -2129,7 +2158,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
}
|
||||
|
||||
LRUHandle* h = NULL;
|
||||
syncCacheEntry(ths->pLogStore, pEntry, &h);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
// append entry
|
||||
|
@ -2169,6 +2197,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
}
|
||||
}
|
||||
|
||||
syncCacheEntry(ths->pLogStore, pEntry, &h);
|
||||
|
||||
// if mulit replica, start replicate right now
|
||||
if (ths->replicaNum > 1) {
|
||||
syncNodeReplicate(ths);
|
||||
|
@ -2335,7 +2365,12 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
|||
LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i));
|
||||
if (h) {
|
||||
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||
|
||||
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);
|
||||
|
||||
} else {
|
||||
sNTrace(ths, "miss cache index:%" PRId64, i);
|
||||
|
||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
|
||||
// ASSERT(code == 0);
|
||||
// ASSERT(pEntry != NULL);
|
||||
|
|
|
@ -92,6 +92,8 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
|
|||
void syncEntryDestory(SSyncRaftEntry* pEntry) {
|
||||
if (pEntry != NULL) {
|
||||
taosMemoryFree(pEntry);
|
||||
|
||||
sTrace("free entry: %p", pEntry);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
|
||||
// pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
|
||||
pLogStore->pCache = taosLRUCacheInit(100 * 1024 * 1024, 1, .5);
|
||||
if (pLogStore->pCache == NULL) {
|
||||
taosMemoryFree(pLogStore);
|
||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||
|
@ -321,6 +322,17 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
|
|||
return 0;
|
||||
}
|
||||
|
||||
// delete from cache
|
||||
for (SyncIndex index = fromIndex; index <= wallastVer; ++index) {
|
||||
SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache;
|
||||
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
|
||||
if (h) {
|
||||
sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index);
|
||||
|
||||
taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = walRollback(pWal, fromIndex);
|
||||
if (code != 0) {
|
||||
int32_t err = terrno;
|
||||
|
|
|
@ -73,7 +73,20 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
|
|||
SyncAppendEntries* pMsg = NULL;
|
||||
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
|
||||
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
|
||||
LRUHandle* h = taosLRUCacheLookup(pCache, &nextIndex, sizeof(nextIndex));
|
||||
int32_t code = 0;
|
||||
if (h) {
|
||||
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||
code = 0;
|
||||
|
||||
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", nextIndex, pEntry->bytes, pEntry);
|
||||
|
||||
} else {
|
||||
sNTrace(pSyncNode, "miss cache index:%" PRId64, nextIndex);
|
||||
|
||||
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
ASSERT(pEntry != NULL);
|
||||
|
@ -99,7 +112,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
|
|||
}
|
||||
}
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
if (h) {
|
||||
taosLRUCacheRelease(pCache, h, false);
|
||||
} else {
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
// prepare msg
|
||||
ASSERT(pMsg != NULL);
|
||||
|
|
|
@ -212,7 +212,11 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
}
|
||||
|
||||
char cfgStr[1024];
|
||||
syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));
|
||||
if (pNode->pRaftCfg != NULL) {
|
||||
syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
char peerStr[1024] = "{";
|
||||
syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
|
||||
|
@ -230,17 +234,19 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
|
|||
// restore error code
|
||||
terrno = errCode;
|
||||
|
||||
taosPrintLog(flags, level, dflag,
|
||||
"vgId:%d, sync %s "
|
||||
"%s"
|
||||
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
|
||||
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
|
||||
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
||||
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
|
||||
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
||||
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
|
||||
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
|
||||
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
|
||||
if (pNode != NULL && pNode->pRaftCfg != NULL) {
|
||||
taosPrintLog(flags, level, dflag,
|
||||
"vgId:%d, sync %s "
|
||||
"%s"
|
||||
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
|
||||
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
|
||||
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
|
||||
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
|
||||
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
||||
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
|
||||
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
|
||||
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
|
||||
}
|
||||
}
|
||||
|
||||
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
|
||||
|
@ -364,9 +370,9 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
|
|||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
|
||||
sNTrace(pSyncNode,
|
||||
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
|
||||
"}, %s",
|
||||
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
|
||||
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64
|
||||
", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
|
||||
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
|
||||
}
|
||||
|
||||
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
|
||||
|
|
|
@ -32,7 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) {
|
|||
return pRef;
|
||||
}
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
void walCloseRef(SWal *pWal, int64_t refId) {
|
||||
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
|
||||
if (ppRef == NULL) return;
|
||||
|
@ -67,7 +67,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
void walUnrefVer(SWalRef *pRef) {
|
||||
pRef->refId = -1;
|
||||
pRef->refFile = -1;
|
||||
|
|
|
@ -70,8 +70,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
|||
taosArrayClear(pWal->fileInfoSet);
|
||||
pWal->vers.firstVer = -1;
|
||||
pWal->vers.lastVer = ver;
|
||||
pWal->vers.commitVer = ver - 1;
|
||||
pWal->vers.snapshotVer = ver - 1;
|
||||
pWal->vers.commitVer = ver;
|
||||
pWal->vers.snapshotVer = ver;
|
||||
pWal->vers.verInSnapshotting = -1;
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
|
||||
,,y,script,./test.sh -f tsim/db/alter_option.sim
|
||||
,,,script,./test.sh -f tsim/db/alter_replica_13.sim
|
||||
,,,script,./test.sh -f tsim/db/alter_replica_31.sim
|
||||
,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
|
||||
,,y,script,./test.sh -f tsim/db/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/db/basic2.sim
|
||||
,,y,script,./test.sh -f tsim/db/basic3.sim
|
||||
|
@ -152,27 +152,27 @@
|
|||
,,y,script,./test.sh -f tsim/parser/selectResNum.sim
|
||||
,,y,script,./test.sh -f tsim/parser/set_tag_vals.sim
|
||||
,,y,script,./test.sh -f tsim/parser/single_row_in_tb.sim
|
||||
,,,script,./test.sh -f tsim/parser/sliding.sim
|
||||
,,y,script,./test.sh -f tsim/parser/sliding.sim
|
||||
,,y,script,./test.sh -f tsim/parser/slimit_alter_tags.sim
|
||||
,,y,script,./test.sh -f tsim/parser/slimit.sim
|
||||
,,y,script,./test.sh -f tsim/parser/slimit1.sim
|
||||
,,y,script,./test.sh -f tsim/parser/stableOp.sim
|
||||
,,y,script,./test.sh -f tsim/parser/tags_dynamically_specifiy.sim
|
||||
,,,script,./test.sh -f tsim/parser/tags_filter.sim
|
||||
,,,script,./test.sh -f tsim/parser/tbnameIn.sim
|
||||
,,y,script,./test.sh -f tsim/parser/tags_filter.sim
|
||||
,,y,script,./test.sh -f tsim/parser/tbnameIn.sim
|
||||
,,y,script,./test.sh -f tsim/parser/timestamp.sim
|
||||
,,y,script,./test.sh -f tsim/parser/top_groupby.sim
|
||||
,,y,script,./test.sh -f tsim/parser/topbot.sim
|
||||
,,,script,./test.sh -f tsim/parser/union.sim
|
||||
,,y,script,./test.sh -f tsim/parser/union_sysinfo.sim
|
||||
,,,script,./test.sh -f tsim/parser/where.sim
|
||||
,,y,script,./test.sh -f tsim/parser/where.sim
|
||||
,,y,script,./test.sh -f tsim/query/charScalarFunction.sim
|
||||
,,y,script,./test.sh -f tsim/query/explain.sim
|
||||
,,y,script,./test.sh -f tsim/query/interval-offset.sim
|
||||
,,y,script,./test.sh -f tsim/query/interval.sim
|
||||
,,y,script,./test.sh -f tsim/query/scalarFunction.sim
|
||||
,,y,script,./test.sh -f tsim/query/scalarNull.sim
|
||||
,,,script,./test.sh -f tsim/query/session.sim
|
||||
,,y,script,./test.sh -f tsim/query/session.sim
|
||||
,,,script,./test.sh -f tsim/query/udf.sim
|
||||
,,y,script,./test.sh -f tsim/qnode/basic1.sim
|
||||
,,y,script,./test.sh -f tsim/snode/basic1.sim
|
||||
|
@ -218,13 +218,13 @@
|
|||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
,,,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
,,,script,./test.sh -f tsim/stream/distributeSession0.sim
|
||||
,,,script,./test.sh -f tsim/stream/session0.sim
|
||||
,,,script,./test.sh -f tsim/stream/session1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/session1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/state0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
,,,script,./test.sh -f tsim/stream/triggerSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby.sim
|
||||
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
|
||||
|
@ -254,7 +254,7 @@
|
|||
,,y,script,./test.sh -f tsim/tmq/basic3Of2Cons.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic4Of2Cons.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
|
||||
,,,script,./test.sh -f tsim/tmq/topic.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/topic.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/snapshot.sim
|
||||
,,y,script,./test.sh -f tsim/tmq/snapshot1.sim
|
||||
,,y,script,./test.sh -f tsim/stable/alter_comment.sim
|
||||
|
@ -281,7 +281,7 @@
|
|||
,,,script,./test.sh -f tsim/sma/drop_sma.sim
|
||||
,,,script,./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||
,,y,script,./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
,,,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
,,y,script,./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
,,n,script,./test.sh -f tsim/valgrind/checkError1.sim
|
||||
,,n,script,./test.sh -f tsim/valgrind/checkError2.sim
|
||||
,,n,script,./test.sh -f tsim/valgrind/checkError3.sim
|
||||
|
@ -300,8 +300,8 @@
|
|||
,,y,script,./test.sh -f tsim/vnode/stable_dnode2_stop.sim
|
||||
,,y,script,./test.sh -f tsim/vnode/stable_dnode2.sim
|
||||
,,y,script,./test.sh -f tsim/vnode/stable_dnode3.sim
|
||||
,,,script,./test.sh -f tsim/vnode/stable_replica3_dnode6.sim
|
||||
,,,script,./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
|
||||
,,y,script,./test.sh -f tsim/vnode/stable_replica3_dnode6.sim
|
||||
,,y,script,./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
|
||||
,,y,script,./test.sh -f tsim/sync/3Replica1VgElect.sim
|
||||
,,y,script,./test.sh -f tsim/sync/3Replica5VgElect.sim
|
||||
,,y,script,./test.sh -f tsim/sync/oneReplica1VgElect.sim
|
||||
|
|
Loading…
Reference in New Issue