Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-11 01:13:58 +00:00
commit 9534265107
32 changed files with 1084 additions and 332 deletions

View File

@ -221,7 +221,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)

View File

@ -83,8 +83,10 @@ typedef struct SReConfigCbMeta {
SyncTerm term;
SyncTerm currentTerm;
SSyncCfg oldCfg;
SSyncCfg newCfg;
bool isDrop;
uint64_t flag;
uint64_t seqNum;
} SReConfigCbMeta;
typedef struct SSnapshot {
@ -106,7 +108,7 @@ typedef struct SSyncFSM {
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
@ -184,7 +186,6 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo);
void syncStart(int64_t rid);
void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
ESyncState syncGetMyRole(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
@ -194,8 +195,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
// to be moved to static
void syncStartNormal(int64_t rid);

View File

@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndScheduleStream1(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen, double filesFactor);

View File

@ -403,6 +403,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t mndTick = 0;
if (++mndTick % 10 == 1) {
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);

View File

@ -127,6 +127,61 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
return 0;
}
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
pTask->dispatchType = TASK_DISPATCH__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
return 0;
}
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
pTask->sinkType = TASK_SINK__NONE;
if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pMnode->pSdb, pDb);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
break;
}
}
}
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
pTask->nodeId = pVgroup->vgId;
@ -139,6 +194,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE);
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
return 0;
}
@ -182,7 +238,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup;
}
int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
@ -234,7 +290,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
return 0;
}
int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(pStream->fixedSinkVgId != 0);
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
@ -279,6 +335,146 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
return 0;
}
int32_t mndScheduleStream1(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pStream->vgNum == 0);
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
if (totLevel == 2 || externalTargetDB) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) {
mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream);
} else {
mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream);
}
}
if (totLevel > 1) {
SStreamTask* pFinalTask;
// inner plan
{
SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskInnerLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
pFinalTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskInnerLevel, pFinalTask);
// input
pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// dispatch
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask);
// exec
pFinalTask->execType = TASK_EXEC__PIPE;
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
}
// source plan
SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskSourceLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL;
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskSourceLevel, pTask);
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
// add fixed vg dispatch
pTask->sinkType = TASK_SINK__NONE;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = pFinalTask->taskId;
pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId;
pTask->fixedEpDispatcher.epSet = pFinalTask->epSet;
// exec
pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
}
}
if (totLevel == 1) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL;
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask);
} else {
mndAddSinkToTask(pMnode, pTrans, pStream, pTask);
}
// exec
pTask->execType = TASK_EXEC__PIPE;
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
}
}
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
@ -300,14 +496,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// add extra sink
hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) {
mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream);
} else {
mndAddFixedSinkToStream(pMnode, pTrans, pStream);
mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream);
}
}
for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
@ -357,18 +554,17 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
}
// dispatch part
if (level == 0) {
if (level == 0 && !hasExtraSink) {
pTask->dispatchType = TASK_DISPATCH__NONE;
} else {
// add fixed ep dispatcher
int32_t lastLevel = level - 1;
ASSERT(lastLevel == 0);
if (hasExtraSink) lastLevel++;
ASSERT(lastLevel == 0);
SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
// one merge only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
/*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED;
@ -465,8 +661,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
}
sdbRelease(pSdb, pVgroup);
}
taosArrayPush(pStream->tasks, &taskOneLevel);
}
#if 0

View File

@ -269,7 +269,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1;
}
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
if (mndScheduleStream1(pMnode, pTrans, pStream) < 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
return -1;
}

View File

@ -96,10 +96,18 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
}
}
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
#endif
pMgmt->errCode = cbMeta.code;
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term);

View File

@ -58,7 +58,7 @@ struct STqReadHandle {
SArray* pColIdList; // SArray<int16_t>
int32_t cachedSchemaVer;
int64_t cachedSchemaUid;
int64_t cachedSchemaSuid;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
};

View File

@ -67,7 +67,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->cachedSchemaVer = -1;
pReadHandle->cachedSchemaUid = -1;
pReadHandle->cachedSchemaSuid = -1;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL;
@ -130,7 +130,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
// TODO set to real sversion
/*int32_t sversion = 1;*/
int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
if (pHandle->pSchema == NULL) {
tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
@ -150,7 +151,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
return -1;
}
pHandle->cachedSchemaVer = sversion;
pHandle->cachedSchemaUid = pHandle->msgIter.suid;
pHandle->cachedSchemaSuid = pHandle->msgIter.suid;
}
STSchema* pTschema = pHandle->pSchema;

View File

@ -305,6 +305,10 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
@ -902,4 +906,4 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
// 2. adjust hash range / compact / remove wals / rename vgroups
// 3. reload sync
return 0;
}
}

View File

@ -180,10 +180,18 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
return 0;
}
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode));
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
#endif
// todo rpc response here
// build rpc msg
// put into apply queue
@ -213,6 +221,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),

View File

@ -809,6 +809,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
@ -907,6 +912,9 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor);
bool isSmaStream(int8_t triggerType);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
#ifdef __cplusplus
}

View File

@ -1956,6 +1956,57 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
}
}
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
const int32_t* rowCellOffset, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
if (pRow->numOfRows == 0) {
releaseBufPage(pBuf, page);
return 0;
}
while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
longjmp(pTaskInfo->env, code);
}
}
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
longjmp(pTaskInfo->env, code);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
return 0;
}
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
int32_t numOfExprs) {
@ -4689,6 +4740,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr =
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
qDebug("[******]create Semi");
int32_t children = 0;

View File

@ -1881,8 +1881,8 @@ _error:
return NULL;
}
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId, SArray* pUpdated) {
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId,
SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
@ -1897,7 +1897,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return ;
return;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
@ -1914,13 +1914,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos,
nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResult(pResult, tableGroupId, pUpdated);
saveResult(pResult, tableGroupId, pUpdated);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
@ -2049,10 +2050,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval,
pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperator->numOfExprs, pOperator->pTaskInfo);
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex,
pChildOp->numOfExprs, pBlock, NULL);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs,
pOperator->pTaskInfo);
taosArrayDestroy(pUpWins);
continue;
}
@ -2062,7 +2063,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break;
}
if (isFinalInterval(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
@ -2072,7 +2073,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
taosArrayPush(pInfo->pChildren, &pChildOp);
}
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
@ -2080,12 +2081,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}
if (isFinalInterval(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
&pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
pInfo->binfo.rowCellInfoOffset);
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
@ -2109,31 +2108,32 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision =
((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.winMap = NULL, };
.winMap = NULL,
};
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
pResBlock, keyBufSize, pTaskInfo->id.str);
@ -2160,7 +2160,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (!isFinalInterval(pInfo)) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
@ -2174,9 +2174,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL,
destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow,
NULL);
pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -2216,8 +2216,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
}
}
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResultBlock) {
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
for (int32_t i = 0; i < numOfCols; ++i) {
@ -3195,3 +3194,281 @@ _error:
pTaskInfo->code = code;
return NULL;
}
typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo;
SHashObj* groupIntervalHash;
bool hasGroupId;
uint64_t groupId;
SSDataBlock* prefetchedBlock;
bool inputBlocksFinished;
} SMergeIntervalAggOperatorInfo;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
taosHashCleanup(miaInfo->groupIntervalHash);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
}
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
if (prevWin == NULL) {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
return 0;
}
if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) {
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
if (newWin == NULL) {
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
} else {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
}
}
return 0;
}
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t startPos = 0;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
iaInfo->interval.precision, &iaInfo->win);
int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx,
numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
ASSERT(forwardRows > 0);
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup,
pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order);
if (startPos < 0) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset,
&iaInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
// window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos,
forwardRows);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
}
if (iaInfo->timeWindowInterpo) {
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
}
}
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
if (!miaInfo->inputBlocksFinished) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN;
while (1) {
SSDataBlock* pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
pBlock = downstream->fpSet.getNextFn(downstream);
} else {
pBlock = miaInfo->prefetchedBlock;
miaInfo->groupId = pBlock->info.groupId;
}
if (pBlock == NULL) {
miaInfo->inputBlocksFinished = true;
break;
}
if (!miaInfo->hasGroupId) {
miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.groupId;
} else if (miaInfo->groupId != pBlock->info.groupId) {
miaInfo->prefetchedBlock = pBlock;
break;
}
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true);
STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
pRes->info.groupId = miaInfo->groupId;
} else {
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
if (p != NULL) {
size_t len = 0;
uint64_t* pKey = taosHashGetKey(p, &len);
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
}
}
if (pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator);
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pRes;
}
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo) {
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) {
goto _error;
}
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = primaryTsSlotId;
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
int32_t code =
initAggInfo(&iaInfo->binfo, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(iaInfo->binfo.pCtx, numOfCols, iaInfo);
if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
}
// iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) {
goto _error;
}
initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1);
pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
taosMemoryFreeClear(miaInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}

View File

@ -2103,8 +2103,49 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo* pOutput) {
pOutput->percent = pInput->percent;
pOutput->algo = pInput->algo;
if (pOutput->algo == APERCT_ALGO_TDIGEST) {
buildTDigestInfo(pInput);
tdigestAutoFill(pInput->pTDigest, COMPRESSION);
if(pInput->pTDigest->num_centroids == 0 && pInput->pTDigest->num_buffered_pts == 0) {
return;
}
buildTDigestInfo(pOutput);
TDigest *pTDigest = pOutput->pTDigest;
if(pTDigest->num_centroids <= 0) {
memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pTDigest, COMPRESSION);
} else {
tdigestMerge(pTDigest, pInput->pTDigest);
}
} else {
buildHistogramInfo(pInput);
if (pInput->pHisto->numOfElems <= 0) {
return;
}
buildHistogramInfo(pOutput);
SHistogramInfo *pHisto = pOutput->pHisto;
if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInput->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
} else {
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInput->pHisto, MAX_HISTOGRAM_BIN);
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes);
}
}
}
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
@ -2113,60 +2154,14 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SAPercentileInfo* pInputInfo;
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
//if (colDataIsNull_s(pCol, i)) {
// continue;
//}
numOfElems += 1;
char* data = colDataGetData(pCol, i);
char* data = colDataGetData(pCol, start);
SAPercentileInfo* pInputInfo = (SAPercentileInfo *)varDataVal(data);
pInputInfo = (SAPercentileInfo *)varDataVal(data);
}
apercentileTransferInfo(pInputInfo, pInfo);
pInfo->percent = pInputInfo->percent;
pInfo->algo = pInputInfo->algo;
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
buildTDigestInfo(pInputInfo);
tdigestAutoFill(pInputInfo->pTDigest, COMPRESSION);
if(pInputInfo->pTDigest->num_centroids == 0 && pInputInfo->pTDigest->num_buffered_pts == 0) {
return TSDB_CODE_SUCCESS;
}
buildTDigestInfo(pInfo);
TDigest *pTDigest = pInfo->pTDigest;
if(pTDigest->num_centroids <= 0) {
memcpy(pTDigest, pInputInfo->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pTDigest, COMPRESSION);
} else {
tdigestMerge(pTDigest, pInputInfo->pTDigest);
}
} else {
buildHistogramInfo(pInputInfo);
if (pInputInfo->pHisto->numOfElems <= 0) {
return TSDB_CODE_SUCCESS;
}
buildHistogramInfo(pInfo);
SHistogramInfo *pHisto = pInfo->pHisto;
if (pHisto->numOfElems <= 0) {
memcpy(pHisto, pInputInfo->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
} else {
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInputInfo->pHisto, MAX_HISTOGRAM_BIN);
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pRes);
}
}
SET_VAL(pResInfo, numOfElems, 1);
SET_VAL(pResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
@ -3049,6 +3044,17 @@ _spread_over:
return TSDB_CODE_SUCCESS;
}
static void spreadTransferInfo(SSpreadInfo* pInput, SSpreadInfo* pOutput) {
pOutput->hasResult = pInput->hasResult;
if (pInput->max > pOutput->max) {
pOutput->max = pInput->max;
}
if (pInput->min < pOutput->min) {
pOutput->min = pInput->min;
}
}
int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
@ -3061,14 +3067,7 @@ int32_t spreadFunctionMerge(SqlFunctionCtx *pCtx) {
char* data = colDataGetData(pCol, start);
pInputInfo = (SSpreadInfo *)varDataVal(data);
pInfo->hasResult = pInputInfo->hasResult;
if (pInputInfo->max > pInfo->max) {
pInfo->max = pInputInfo->max;
}
if (pInputInfo->min < pInfo->min) {
pInfo->min = pInputInfo->min;
}
spreadTransferInfo(pInputInfo, pInfo);
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
@ -3206,6 +3205,17 @@ _elapsed_over:
return TSDB_CODE_SUCCESS;
}
static void elapsedTransferInfo(SElapsedInfo* pInput, SElapsedInfo* pOutput) {
pOutput->timeUnit = pInput->timeUnit;
if (pOutput->min > pInput->min) {
pOutput->min = pInput->min;
}
if (pOutput->max < pInput->max) {
pOutput->max = pInput->max;
}
}
int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
@ -3217,14 +3227,7 @@ int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
char* data = colDataGetData(pCol, start);
SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data);
pInfo->timeUnit = pInputInfo->timeUnit;
if (pInfo->min > pInputInfo->min) {
pInfo->min = pInputInfo->min;
}
if (pInfo->max < pInputInfo->max) {
pInfo->max = pInputInfo->max;
}
elapsedTransferInfo(pInputInfo, pInfo);
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;
@ -3470,6 +3473,17 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
static void histogramTransferInfo(SHistoFuncInfo* pInput, SHistoFuncInfo* pOutput) {
pOutput->normalized = pInput->normalized;
pOutput->numOfBins = pInput->numOfBins;
pOutput->totalCount += pInput->totalCount;
for (int32_t k = 0; k < pOutput->numOfBins; ++k) {
pOutput->bins[k].lower = pInput->bins[k].lower;
pOutput->bins[k].upper = pInput->bins[k].upper;
pOutput->bins[k].count += pInput->bins[k].count;
}
}
int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
@ -3481,14 +3495,7 @@ int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) {
char* data = colDataGetData(pCol, start);
SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data);
pInfo->normalized = pInputInfo->normalized;
pInfo->numOfBins = pInputInfo->numOfBins;
pInfo->totalCount += pInputInfo->totalCount;
for (int32_t k = 0; k < pInfo->numOfBins; ++k) {
pInfo->bins[k].lower = pInputInfo->bins[k].lower;
pInfo->bins[k].upper = pInputInfo->bins[k].upper;
pInfo->bins[k].count += pInputInfo->bins[k].count;
}
histogramTransferInfo(pInputInfo, pInfo);
SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins);
return TSDB_CODE_SUCCESS;
@ -3676,6 +3683,14 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
static void hllTransferInfo(SHLLInfo* pInput, SHLLInfo* pOutput) {
for (int32_t k = 0; k < HLL_BUCKETS; ++k) {
if (pOutput->buckets[k] < pInput->buckets[k]) {
pOutput->buckets[k] = pInput->buckets[k];
}
}
}
int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
@ -3687,11 +3702,7 @@ int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) {
char* data = colDataGetData(pCol, start);
SHLLInfo* pInputInfo = (SHLLInfo *)varDataVal(data);
for (int32_t k = 0; k < HLL_BUCKETS; ++k) {
if (pInfo->buckets[k] < pInputInfo->buckets[k]) {
pInfo->buckets[k] = pInputInfo->buckets[k];
}
}
hllTransferInfo(pInputInfo, pInfo);
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;

View File

@ -201,8 +201,8 @@ void syncNodeRelease(SSyncNode* pNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
void syncNodeBecomeLeader(SSyncNode* pSyncNode);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);

View File

@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
cJSON *raftStore2Json(SRaftStore *pRaftStore);
char *raftStore2Str(SRaftStore *pRaftStore);
cJSON * raftStore2Json(SRaftStore *pRaftStore);
char * raftStore2Str(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);

View File

@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
void * pReader;
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
int64_t sendingMS;
@ -58,28 +58,29 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void *pWriter;
void * pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSyncNode *pSyncNode;
int32_t replicaIndex;
SRaftId fromId;
} SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);

View File

@ -150,7 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
@ -380,17 +380,19 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths);
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "config change");
}
}
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
@ -399,10 +401,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
@ -469,7 +473,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
sInfo("sync event log truncate, from %ld to %ld", delBegin, delEnd);
sInfo("sync event vgId:%d log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
@ -571,7 +575,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (condition) {
sTrace("recv SyncAppendEntries, candidate to follower");
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
return ret;
}
@ -742,6 +746,18 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin,
commitEnd, syncUtilState2String(ths->state));
}
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;

View File

@ -121,7 +121,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
{
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
@ -147,7 +147,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
if (gRaftDetailLog) {
sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
}
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
@ -159,7 +162,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
sTrace("update next not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
if (gRaftDetailLog) {
sTrace("update next index not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
}
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) {
@ -178,9 +183,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char* s = snapshotSender2Str(pSender);
sInfo("sync event snapshot send start sender first time, sender:%s", s);
taosMemoryFree(s);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
taosMemoryFree(s);
} else {
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm);
}
}
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
@ -195,12 +214,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
sTrace("update next not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
if (gRaftDetailLog) {
sTrace("update next index not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
}
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
{
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",

View File

@ -48,13 +48,28 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
// advance commit index to sanpshot first
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
}
// update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
--index) {
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index,
pSyncNode->commitIndex);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index,
pSyncNode->commitIndex);
}
if (agree) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
@ -64,16 +79,21 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
// update commit index
newCommitIndex = index;
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
}
syncEntryDestory(pEntry);
break;
} else {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
if (gRaftDetailLog) {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
}
}
syncEntryDestory(pEntry);
@ -84,7 +104,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
sTrace("syncMaybeAdvanceCommitIndex sync commit %ld", newCommitIndex);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex sync commit %ld", newCommitIndex);
}
// update commit index
pSyncNode->commitIndex = newCommitIndex;

View File

@ -40,7 +40,7 @@ int32_t syncEnvStart() {
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart();
assert(gSyncEnv != NULL);
sTrace("syncEnvStart ok!");
sTrace("sync env start ok");
return ret;
}

View File

@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}

View File

@ -87,7 +87,9 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL);
syncNodeLog2("syncNodeOpen open success", pSyncNode);
if (gRaftDetailLog) {
syncNodeLog2("syncNodeOpen open success", pSyncNode);
}
pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
if (pSyncNode->rid < 0) {
@ -173,20 +175,37 @@ int32_t syncSetStandby(int64_t rid) {
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0;
char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
sInfo("==syncReconfig== newconfig:%s", configChange);
char* newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg);
if (gRaftDetailLog) {
sInfo("==syncReconfig== newconfig:%s", newconfig);
}
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
rpcMsg.info.noResp = 1;
rpcMsg.contLen = strlen(configChange) + 1;
rpcMsg.contLen = strlen(newconfig) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
taosMemoryFree(configChange);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
taosMemoryFree(newconfig);
ret = syncPropose(rid, &rpcMsg, false);
return ret;
}
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
pRpcMsg->info.noResp = 1;
pRpcMsg->contLen = strlen(newconfig) + 1;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
taosMemoryFree(newconfig);
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
@ -374,13 +393,14 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
sTrace("syncPropose msgType:%d ", pMsg->msgType);
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (pSyncNode == NULL) {
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
assert(rid == pSyncNode->rid);
sTrace("sync event vgId:%d propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub;
@ -411,6 +431,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
sInfo("sync event vgId:%d sync open", pSyncInfo->vgId);
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
@ -439,9 +461,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
assert(pSyncNode->pRaftCfg != NULL);
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
sInfo("syncNodeOpen update config :%s", seralized);
taosMemoryFree(seralized);
if (gRaftDetailLog) {
char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
sInfo("syncNodeOpen update config :%s", seralized);
taosMemoryFree(seralized);
}
raftCfgClose(pSyncNode->pRaftCfg);
}
@ -612,7 +636,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
// snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100);
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
// start in syncNodeStart
// start raft
@ -628,9 +652,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode->pRaftStore);
syncNodeBecomeLeader(pSyncNode);
syncNodeLog2("==state change become leader immediately==", pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start");
// Raft 3.6.2 Committing entries from previous terms
@ -638,41 +660,22 @@ void syncNodeStart(SSyncNode* pSyncNode) {
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
/*
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
*/
sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
return;
}
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "first start");
// for test
int32_t ret = 0;
// int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
// assert(ret == 0);
/*
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
*/
sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
@ -687,6 +690,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodeClose(SSyncNode* pSyncNode) {
sInfo("sync event vgId:%d sync close", pSyncNode->vgId);
int32_t ret;
assert(pSyncNode != NULL);
@ -1061,7 +1066,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256;
char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len,
"syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"electTimerLogicClock:%lu, "
"electTimerLogicClockUser:%lu, "
"electTimerMS:%d, replicaNum:%d",
@ -1131,7 +1136,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
}
raftCfgPersist(pSyncNode->pRaftCfg);
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
if (gRaftDetailLog) {
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
}
}
SSyncNode* syncNodeAcquire(int64_t rid) {
@ -1149,12 +1157,14 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid)
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "update term");
raftStoreClearVote(pSyncNode->pRaftStore);
}
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr);
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache = EMPTY_RAFT_ID;
@ -1186,7 +1196,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
// evoterLog |-> voterLog[i]]}
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr);
// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
@ -1237,7 +1249,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "candidate to leader");
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
@ -1260,14 +1272,14 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "leader to follower");
syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
}
@ -1467,9 +1479,11 @@ void syncNodeLog(SSyncNode* pObj) {
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
if (gRaftDetailLog) {
char* serialized = syncNode2Str(pObj);
sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ------ local funciton ---------
@ -1724,17 +1738,19 @@ const char* syncStr(ESyncState state) {
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sInfo("sync event commit from index:%" PRId64 " to index:%" PRId64 ", %s", beginIndex, endIndex,
syncUtilState2String(state));
sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
endIndex, syncUtilState2String(state));
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
}
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
}
/*
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
}
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
}
*/
// execute fsm
if (ths->pFsm != NULL) {
@ -1791,17 +1807,19 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths);
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "config change");
}
}
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
@ -1810,10 +1828,12 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
@ -1824,7 +1844,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("restore finish %p vgId:%d", ths, ths->vgId);
sInfo("sync event vgId:%d restore finish", ths->vgId);
}
}
@ -1853,4 +1873,4 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
}
}
return pSender;
}
}

View File

@ -14,6 +14,7 @@
*/
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n]
@ -161,7 +162,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sTrace("sync event write index:%" PRId64, pEntry->index);
sTrace("sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId,
pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
return code;
}

View File

@ -122,7 +122,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
{
if (gRaftDetailLog) {
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
@ -201,7 +201,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
}
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;

View File

@ -20,7 +20,7 @@
#include "syncUtil.h"
#include "wal.h"
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
@ -105,13 +105,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
sTrace("sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", host,
port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
}
syncSnapshotSendDestroy(pMsg);
}
@ -183,9 +193,11 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
pSender->start = false;
char *s = snapshotSender2Str(pSender);
sInfo("snapshotSenderStop %s", s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char *s = snapshotSender2Str(pSender);
sInfo("snapshotSenderStop %s", s);
taosMemoryFree(s);
}
}
// when sender receiver ack, call this function to send msg from seq
@ -225,20 +237,29 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
}
} else {
sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
msgStr);
sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
}
taosMemoryFree(msgStr);
syncSnapshotSendDestroy(pMsg);
return 0;
@ -260,13 +281,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
sTrace("sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId,
host, port, pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port,
pSender->seq, pSender->ack);
}
syncSnapshotSendDestroy(pMsg);
}
@ -331,7 +358,7 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
}
// -------------------------------------
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
@ -345,7 +372,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->pWriter = NULL;
pReceiver->pSyncNode = pSyncNode;
pReceiver->replicaIndex = replicaIndex;
pReceiver->fromId = fromId;
pReceiver->term = pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = 0;
@ -365,10 +392,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// begin receive snapshot msg (current term, seq begin)
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = fromId;
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
@ -377,14 +405,15 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
if (!snapshotReceiverIsStart(pReceiver)) {
// start
snapshotReceiverDoStart(pReceiver, privateTerm);
snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
pReceiver->start = true;
} else {
// already start
sInfo("snapshot recv, receiver already start");
// force close, abandon incomplete data
int32_t ret =
@ -393,15 +422,15 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer
pReceiver->pWriter = NULL;
// start again
snapshotReceiverDoStart(pReceiver, privateTerm);
snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
pReceiver->start = true;
ASSERT(0);
}
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStart %s", s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStart %s", s);
taosMemoryFree(s);
}
}
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
@ -418,9 +447,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
++(pReceiver->privateTerm);
}
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStop %s", s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStop %s", s);
taosMemoryFree(s);
}
}
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
@ -436,7 +467,22 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
cJSON *pFromId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
cJSON_AddItemToObject(pRoot, "fromId", pFromId);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
@ -468,17 +514,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin
snapshotReceiverStart(pReceiver, pMsg->privateTerm);
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
pReceiver->ack = pMsg->seq;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sTrace("sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM
@ -486,29 +538,46 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
ASSERT(writeCode == 0);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sInfo(
"sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s",
host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
taosMemoryFree(logSimpleStr);
if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sInfo(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
logSimpleStr);
taosMemoryFree(logSimpleStr);
} else {
sInfo(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver, true);
pReceiver->ack = pMsg->seq;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
@ -519,11 +588,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv "
"msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering
@ -535,13 +610,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
}
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sTrace("sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else {
ASSERT(0);

View File

@ -146,7 +146,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
}

View File

@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
}

View File

@ -41,7 +41,11 @@ SSyncSnapshotReceiver* createReceiver() {
pSyncNode->pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
pSyncNode->pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, 2);
SRaftId id;
id.addr = syncUtilAddr2U64("1.2.3.4", 99);
id.vgId = 100;
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, id);
pReceiver->start = true;
pReceiver->ack = 20;
pReceiver->pWriter = (void*)0x11;

View File

@ -146,8 +146,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
char* s = syncCfg2Str(&newCfg);
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
char* s = syncCfg2Str(&(cbMeta.newCfg));
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
taosMemoryFree(s);
@ -235,7 +235,6 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
}
}
int64_t rid = syncOpen(&syncInfo);
assert(rid > 0);

@ -1 +1 @@
Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda
Subproject commit 932da0f4cac013c2eded824d1d4d01cfa6168fa3