diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8af5945300..8565eea63b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -25,7 +25,7 @@ extern "C" { // clang-format off #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) - + #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) #define TAOS_SUCCEEDED(err) ((err) >= 0) #define TAOS_FAILED(err) ((err) < 0) @@ -35,7 +35,7 @@ const char* terrstr(); int32_t* taosGetErrno(); #define terrno (*taosGetErrno()) - + #define TSDB_CODE_SUCCESS 0 #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0820e884ce..f7e22cb151 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -191,6 +191,11 @@ bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t uniqueFunction(SqlFunctionCtx *pCtx); +bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool modeFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t modeFunction(SqlFunctionCtx *pCtx); +int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t twaFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d41bc89a5f..699a050aea 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1045,20 +1045,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3 return translateFirstLastImpl(pFunc, pErrBuf, len, false); } -static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { +static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); if (!nodesExprHasColumn(pPara)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE"); } pFunc->node.resType = ((SExprNode*)pPara)->resType; return TSDB_CODE_SUCCESS; } +static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateUniqueMode(pFunc, pErrBuf, len, true); +} + +static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateUniqueMode(pFunc, pErrBuf, len, false); +} + static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (numOfParams == 0 || numOfParams > 2) { @@ -2109,7 +2117,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "unique", .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC, .translateFunc = translateUnique, .getEnvFunc = getUniqueFuncEnv, @@ -2117,6 +2125,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = uniqueFunction, .finalizeFunc = NULL }, + { + .name = "mode", + .type = FUNCTION_TYPE_MODE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateMode, + .getEnvFunc = getModeFuncEnv, + .initFunc = modeFunctionSetup, + .processFunc = modeFunction, + .finalizeFunc = modeFinalize, + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c4d3a26ab4..06f24f39e7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -32,6 +32,7 @@ #define TAIL_MAX_OFFSET 100 #define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10) +#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE #define HLL_BUCKET_BITS 14 // The bits of the bucket #define HLL_DATA_BITS (64 - HLL_BUCKET_BITS) @@ -246,6 +247,19 @@ typedef struct SUniqueInfo { char pItems[]; } SUniqueInfo; +typedef struct SModeItem { + int64_t count; + char data[]; +} SModeItem; + +typedef struct SModeInfo { + int32_t numOfPoints; + uint8_t colType; + int16_t colBytes; + SHashObj* pHash; + char pItems[]; +} SModeInfo; + typedef struct SDerivInfo { double prevValue; // previous value TSKEY prevTs; // previous timestamp @@ -4694,21 +4708,100 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) { return pInfo->numOfPoints; } -int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { +bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE; + return true; +} + +bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; + } + + SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + pInfo->numOfPoints = 0; + pInfo->colType = pCtx->resDataInfo.type; + pInfo->colBytes = pCtx->resDataInfo.bytes; + if (pInfo->pHash != NULL) { + taosHashClear(pInfo->pHash); + } else { + pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + return true; +} + +static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) { + // ignore null elements + if (isNull) { + return; + } + + int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; + SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); + if (pHashItem == NULL) { + int32_t size = sizeof(SModeItem) + pInfo->colBytes; + SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size); + memcpy(pItem->data, data, pInfo->colBytes); + pItem->count += 1; + + taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); + pInfo->numOfPoints++; + } else { + (*pHashItem)->count += 1; + } +} + +int32_t modeFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + char* data = colDataGetData(pInputCol, i); + doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i)); + + if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) { + taosHashCleanup(pInfo->pHash); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SET_VAL(pResInfo, 1, 1); + + return TSDB_CODE_SUCCESS; +} + +int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + int32_t currentRow = pBlock->info.rows; - for (int32_t i = 0; i < pResInfo->numOfRes; ++i) { - SUniqueItem* pItem = (SUniqueItem*)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes)); - colDataAppend(pCol, i, pItem->data, false); - // TODO: handle ts output + int32_t resIndex; + int32_t maxCount = 0; + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes)); + if (pItem->count > maxCount) { + maxCount = pItem->count; + resIndex = i; + } else if (pItem->count == maxCount) { + resIndex = -1; + } } + SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); + colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false); + return pResInfo->numOfRes; } + bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(STwaInfo); return true; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 55e0755a89..14688f8912 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); -void snapshotSenderStop(SSyncSnapshotSender *pSender); +void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); @@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index b7122b9c52..10f2745651 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); - SSnapshot snapshot; + SSnapshot snapshot = {.data = NULL, + .lastApplyIndex = SYNC_INDEX_INVALID, + .lastApplyTerm = 0, + .lastConfigIndex = SYNC_INDEX_INVALID}; void* pReader = NULL; ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader); if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 && @@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pReader != NULL); snapshotSenderStart(pSender, snapshot, pReader); - char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); - syncNodeEventLog(ths, eventLog); - taosMemoryFree(eventLog); - } else { // no snapshot if (pReader != NULL) { @@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } } - /* - bool hasSnapshot = syncNodeHasSnapshot(ths); - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - - // start sending snapshot first time - // start here, stop by receiver - if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) && - pMsg->privateTerm < pSender->privateTerm) { - snapshotSenderStart(pSender); - - char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); - syncNodeEventLog(ths, eventLog); - taosMemoryFree(eventLog); - } - */ - SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; // update nextIndex to sentryIndex @@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex); - return ret; + return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 39e972c4c4..7b15be2d14 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -24,6 +24,7 @@ //---------------------------------- static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); +static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); //---------------------------------- SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { @@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { - sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId); + sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId); } return pSender; @@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void syncEntryDestory(pEntry); } else { if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) { - sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId); + sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId); pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg; getLastConfig = true; } @@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void // event log do { - char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send"); + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start"); syncNodeEventLog(pSender->pSyncNode, eventLog); taosMemoryFree(eventLog); } while (0); } -void snapshotSenderStop(SSyncSnapshotSender *pSender) { +void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // close reader if (pSender->pReader != NULL) { int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); @@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) { // update flag pSender->start = false; + pSender->finish = finish; + + // event log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop"); + syncNodeEventLog(pSender->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } // when sender receive ack, call this function to send msg from seq @@ -386,7 +395,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; } else { - sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId); + sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId); } return pReceiver; @@ -409,9 +418,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } -// static do start +// static do start by privateTerm, pBeginMsg // receive first snapshot data -// privateTerm, pBeginMsg +// write first block data static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { // update state @@ -419,6 +428,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p pReceiver->privateTerm = privateTerm; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->fromId = pBeginMsg->srcId; + pReceiver->start = true; // update snapshot pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; @@ -429,8 +439,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p ASSERT(pReceiver->pWriter == NULL); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); ASSERT(ret == 0); + + // event log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } +// force stop static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { // force close, abandon incomplete data if (pReceiver->pWriter != NULL) { @@ -441,33 +459,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { } pReceiver->start = false; + + // event log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { if (!snapshotReceiverIsStart(pReceiver)) { - // start + // first start snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); - pReceiver->start = true; } else { // already start - sInfo("snapshot recv, receiver already start"); + sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId); // force close, abandon incomplete data - int32_t ret = - pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); - ASSERT(ret == 0); - pReceiver->pWriter = NULL; + snapshotReceiverForceStop(pReceiver); // start again snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); - pReceiver->start = true; } } -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); @@ -477,8 +497,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) { pReceiver->start = false; - if (apply) { - // ++(pReceiver->privateTerm); + // event log + do { + SSnapshot snapshot; + pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); +} + +static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { + ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END); + + if (pReceiver->pWriter != NULL) { + int32_t code = 0; + if (pMsg->dataLen > 0) { + code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, + pMsg->dataLen); + ASSERT(code == 0); + } + + code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); + ASSERT(code == 0); + pReceiver->pWriter = NULL; + } + + pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; + + // update commit index + if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { + pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; + } + + // reset wal + pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); + + // event log + do { + SSnapshot snapshot; + pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); +} + +static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { + ASSERT(pMsg->seq == pReceiver->ack + 1); + + if (pReceiver->pWriter != NULL) { + if (pMsg->dataLen > 0) { + int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + pMsg->data, pMsg->dataLen); + ASSERT(code == 0); + } + pReceiver->ack = pMsg->seq; + + // event log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving"); + syncNodeEventLog(pReceiver->pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } } @@ -560,33 +641,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // get receiver SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; bool needRsp = false; - int32_t writeCode = 0; // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - // begin + // begin, no data snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg); - pReceiver->ack = pMsg->seq; needRsp = true; - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { // end, finish FSM - writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); - ASSERT(writeCode == 0); - - pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); - if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { - pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; - } - - // pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); - pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex); + snapshotReceiverFinish(pReceiver, pMsg); + snapshotReceiverStop(pReceiver); + needRsp = true; // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { @@ -601,89 +669,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex); } - SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish, apply snapshot"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - - pReceiver->pWriter = NULL; - snapshotReceiverStop(pReceiver, true); - pReceiver->ack = pMsg->seq; - needRsp = true; - - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { - pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); - snapshotReceiverStop(pReceiver, false); + // force close + snapshotReceiverForceStop(pReceiver); needRsp = false; - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { // transfering if (pMsg->seq == pReceiver->ack + 1) { - writeCode = - pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); - ASSERT(writeCode == 0); - pReceiver->ack = pMsg->seq; + snapshotReceiverGotData(pReceiver, pMsg); } needRsp = true; - do { - char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } else { ASSERT(0); } + // send ack if (needRsp) { + // build msg SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pReceiver->ack; - pRspMsg->code = writeCode; - pRspMsg->privateTerm = pReceiver->privateTerm; + pRspMsg->ack = pReceiver->ack; // receiver maybe already closed + pRspMsg->code = 0; + pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed + // send msg SRpcMsg rpcMsg; syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); - syncSnapshotRspDestroy(pRspMsg); } + } else { + // error log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } } else { - syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode); + // error log + do { + char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); } return 0; } +static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + ASSERT(pMsg->ack == pSender->seq); + pSender->ack = pMsg->ack; + ++(pSender->seq); +} + // sender receives ack, set seq = ack + 1, send msg from seq // if ack == SYNC_SNAPSHOT_SEQ_END, stop sender int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - sInfo("recv SyncSnapshotRsp maybe replica already dropped"); - return 0; + sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId); + return -1; } // get sender @@ -695,27 +748,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { // receiver ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { - pSender->finish = true; - snapshotSenderStop(pSender); + snapshotSenderStop(pSender, true); return 0; } // send next msg if (pMsg->ack == pSender->seq) { // update sender ack - pSender->ack = pMsg->ack; - (pSender->seq)++; + snapshotSenderUpdateProgress(pSender, pMsg); snapshotSend(pSender); } else if (pMsg->ack == pSender->seq - 1) { snapshotReSend(pSender); } else { - ASSERT(0); + do { + char logBuf[64]; + snprintf(logBuf, sizeof(logBuf), "error ack, recv ack:%d, my seq:%d", pMsg->ack, pSender->seq); + char *eventLog = snapshotSender2SimpleStr(pSender, logBuf); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return -1; } + } else { + // error log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return -1; } } else { - syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode); + // error log + do { + char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader"); + syncNodeErrorLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return -1; } return 0;