diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 24ed795a02..544af9b6ee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1764,17 +1764,15 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp); typedef struct { const char* tbName; int8_t action; + const char* colName; // TSDB_ALTER_TABLE_ADD_COLUMN - int8_t type; - int32_t bytes; - const char* colAddName; + int8_t type; + int8_t flags; + int32_t bytes; // TSDB_ALTER_TABLE_DROP_COLUMN - const char* colDropName; // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES - const char* colModName; - int32_t colModBytes; + int32_t colModBytes; // TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME - const char* colOldName; const char* colNewName; // TSDB_ALTER_TABLE_UPDATE_TAG_VAL const char* tagName; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ca10465ed4..551e0fc7b8 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -144,6 +144,7 @@ int32_t syncInit(); void syncCleanUp(); int64_t syncOpen(const SSyncInfo* pSyncInfo); void syncStart(int64_t rid); +void syncStartStandBy(int64_t rid); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8b160f54f6..cc10348d24 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -323,6 +323,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516) #define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517) #define TSDB_CODE_VND_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0518) +#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) +#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) +#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 666e9e0ffa..100ec42899 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -518,8 +518,9 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); - printf("failed to connect to server, reason: %s\n\n", errorMsg); + fprintf(stderr,"failed to connect to server, reason: %s\n\n", errorMsg); + terrno = pRequest->code; destroyRequest(pRequest); taos_close(pTscObj); pTscObj = NULL; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e539aa3467..52edd79f3b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4171,19 +4171,20 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { if (tEncodeI8(pEncoder, pReq->action) < 0) return -1; switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeI8(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->flags) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->bytes) < 0) return -1; - if (tEncodeCStr(pEncoder, pReq->colAddName) < 0) return -1; break; case TSDB_ALTER_TABLE_DROP_COLUMN: - if (tEncodeCStr(pEncoder, pReq->colDropName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: - if (tEncodeCStr(pEncoder, pReq->colModName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: - if (tEncodeCStr(pEncoder, pReq->colOldName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; if (tEncodeCStr(pEncoder, pReq->colNewName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: @@ -4216,21 +4217,23 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->flags) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->bytes) < 0) return -1; - if (tDecodeCStr(pDecoder, &pReq->colAddName) < 0) return -1; break; case TSDB_ALTER_TABLE_DROP_COLUMN: - if (tDecodeCStr(pDecoder, &pReq->colDropName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: - if (tDecodeCStr(pDecoder, &pReq->colModName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: - if (tDecodeCStr(pDecoder, &pReq->colOldName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->colNewName) < 0) return -1; break; case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d6e99c0899..d8309850b3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -192,7 +192,8 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rpcFreeCont(originalRpcMsg.pCont); // if leader, send response - if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { + //if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { + if (pMsg->rpcMsg.handle != NULL) { rsp.ahandle = pMsg->rpcMsg.ahandle; rsp.handle = pMsg->rpcMsg.handle; rsp.refId = pMsg->rpcMsg.refId; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a2be393eb2..4431a2c48b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -189,6 +189,7 @@ struct SMetaEntry { struct { int64_t ctime; int32_t ttlDays; + int32_t ncid; // next column id SSchemaWrapper schema; } ntbEntry; struct { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a217a46036..2018f0a68c 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -83,6 +83,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); +int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); int metaGetTableEntryByName(SMetaReader* pReader, const char* name); @@ -97,7 +98,7 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg *pKeepCfg); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); @@ -182,7 +183,7 @@ typedef enum { TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 } ETsdbType; -struct STsdbKeepCfg{ +struct STsdbKeepCfg { int8_t precision; // precision always be used with below keep cfgs int32_t days; int32_t keep0; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index dd36906b19..2bc0d7517d 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -34,6 +34,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { } else if (pME->type == TSDB_NORMAL_TABLE) { if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; + if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; @@ -65,10 +66,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { } else if (pME->type == TSDB_NORMAL_TABLE) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; + if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; - } else { + } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index e5377d543a..e61064fe67 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -240,6 +240,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.schema = pReq->ntb.schema; + me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1; } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -374,6 +375,170 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { return 0; } +static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + void *pVal = NULL; + int nVal = 0; + const void *pData = NULL; + int nData = 0; + int ret = 0; + tb_uid_t uid; + int64_t oversion; + SSchema *pColumn = NULL; + SMetaEntry entry = {0}; + SSchemaWrapper *pSchema; + int c; + + // search name index + ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + if (ret < 0) { + terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; + return -1; + } + + uid = *(tb_uid_t *)pVal; + tdbFree(pVal); + pVal = NULL; + + // search uid index + TDBC *pUidIdxc = NULL; + + tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); + ASSERT(c == 0); + + tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + oversion = *(int64_t *)pData; + + // search table.db + TDBC *pTbDbc = NULL; + + tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); + ASSERT(c == 0); + tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData); + + // get table entry + SDecoder dc = {0}; + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &entry); + + if (entry.type != TSDB_NORMAL_TABLE) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schema; + int32_t iCol = 0; + for (;;) { + pColumn = NULL; + + if (iCol >= pSchema->nCols) break; + pColumn = &pSchema->pSchema[iCol]; + + if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break; + iCol++; + } + + entry.version = version; + int tlen; + switch (pAlterTbReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + if (pColumn) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + pSchema->sver++; + pSchema->nCols++; + pSchema->pSchema = + taosMemoryRealloc(entry.ntbEntry.schema.pSchema, sizeof(SSchema) * entry.ntbEntry.schema.nCols); + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++; + strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName); + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + if (pColumn->colId == 0) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + pSchema->sver++; + pSchema->nCols--; + tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); + if (tlen) { + memmove(pColumn, pColumn + 1, tlen); + } + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes <= pAlterTbReq->bytes) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + pSchema->sver++; + pColumn->bytes = pAlterTbReq->bytes; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + pSchema->sver++; + strcpy(pColumn->name, pAlterTbReq->colNewName); + break; + } + + entry.version = version; + + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return 0; + +_err: + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return -1; +} + +static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + // TODO + return 0; +} + +static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + // TODO + ASSERT(0); + return 0; +} + +int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { + switch (pReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + case TSDB_ALTER_TABLE_DROP_COLUMN: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + return metaAlterTableColumn(pMeta, version, pReq); + case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: + return metaUpdateTableTagVal(pMeta, version, pReq); + case TSDB_ALTER_TABLE_UPDATE_OPTIONS: + return metaUpdateTableOptions(pMeta, version, pReq); + default: + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + return -1; + break; + } +} + static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { STbDbKey tbDbKey; void *pKey = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0a3e5c3d63..55fe8a3945 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2034,11 +2034,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf lastKeyAppend = key; if (rv1 != TD_ROW_SVER(row1)) { - // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); rv1 = TD_ROW_SVER(row1); } if (row2 && rv2 != TD_ROW_SVER(row2)) { - // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d4363f1ea0..ac49a0b078 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -19,7 +19,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); @@ -82,7 +82,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_TABLE: - if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TABLE: if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -455,9 +455,32 @@ _exit: return 0; } -static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { - // TODO - ASSERT(0); +static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVAlterTbReq vAlterTbReq = {0}; + SDecoder dc = {0}; + + pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP; + pRsp->pCont = NULL; + pRsp->contLen = 0; + pRsp->code = TSDB_CODE_SUCCESS; + + tDecoderInit(&dc, pReq, len); + + // decode + if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) { + pRsp->code = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; + } + + // process + if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) { + pRsp->code = terrno; + tDecoderClear(&dc); + return -1; + } + + tDecoderClear(&dc); return 0; } @@ -668,7 +691,7 @@ _exit: static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { SVCreateTSmaReq req = {0}; - SDecoder coder; + SDecoder coder; pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -682,7 +705,7 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = terrno; goto _err; } - + if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) { pRsp->code = terrno; goto _err; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 717958c033..6a25b23c6b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -812,13 +812,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, - tstrerror(ctx->rspCode)); + QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode)); QW_ERR_JRET(ctx->rspCode); } _return: - if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -836,7 +834,11 @@ _return: QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); } - QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { + QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); + } else { + QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); + } QW_RET(code); } diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 63f24b104f..0a6e2428fe 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -35,6 +35,7 @@ typedef struct SSyncIndexMgr { } SSyncIndexMgr; SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode); +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode); void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a21eea7b7..9b655fb0fa 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -247,6 +247,7 @@ typedef struct SSyncNode { // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); +void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); // ping -------------- @@ -271,7 +272,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 159af1610e..1b08d3f7a1 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -62,7 +62,6 @@ bool syncUtilUserPreCommit(tmsg_t msgType); bool syncUtilUserCommit(tmsg_t msgType); bool syncUtilUserRollback(tmsg_t msgType); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index aed19d042e..1a5d418e75 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,11 +15,11 @@ #include "syncAppendEntries.h" #include "syncInt.h" +#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "syncRaftCfg.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - //if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { if (syncUtilUserRollback(pRollBackEntry->msgType)) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -324,7 +324,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - //if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -338,10 +338,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // config change if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); ASSERT(ret == 0); syncNodeUpdateConfig(ths, &newSyncCfg); + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 620f0e9cd2..0f17cf267e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -16,10 +16,10 @@ #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" -#include "syncRaftCfg.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -102,7 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - //if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -114,12 +114,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { - SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); - ASSERT(ret == 0); + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); - syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index a3d1717c3b..8b7411c563 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -15,6 +15,7 @@ #include "syncIO.h" #include +#include "os.h" #include "syncMessage.h" #include "syncUtil.h" #include "tglobal.h" @@ -198,6 +199,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); + snprintf(rpcInit.localFqdn, sizeof(rpcInit.localFqdn), "%s", "127.0.0.1"); rpcInit.localPort = io->myAddr.eps[0].port; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index d33075054a..5809cedb90 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -31,6 +31,13 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { return pSyncIndexMgr; } +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) { + pSyncIndexMgr->replicas = &(pSyncNode->replicasId); + pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; + pSyncIndexMgr->pSyncNode = pSyncNode; + syncIndexMgrClear(pSyncIndexMgr); +} + void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { if (pSyncIndexMgr != NULL) { taosMemoryFree(pSyncIndexMgr); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index da23d8415b..469682a7b5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -103,6 +103,16 @@ void syncStart(int64_t rid) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); } +void syncStartStandBy(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return; + } + syncNodeStartStandBy(pSyncNode); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); +} + void syncStop(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -116,7 +126,7 @@ void syncStop(int64_t rid) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; - char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); + char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; rpcMsg.noResp = 1; @@ -188,10 +198,9 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { (pEpSet->numOfEps)++; sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); - } pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; - + sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -524,6 +533,17 @@ void syncNodeStart(SSyncNode* pSyncNode) { assert(ret == 0); } +void syncNodeStartStandBy(SSyncNode* pSyncNode) { + // state change + pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + syncNodeStopHeartbeatTimer(pSyncNode); + + // reset elect timer, long enough + int32_t electMS = TIMER_MAX_MS; + int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); + ASSERT(ret == 0); +} + void syncNodeClose(SSyncNode* pSyncNode) { int32_t ret; assert(pSyncNode != NULL); @@ -858,7 +878,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); ASSERT(ret == 0); @@ -885,6 +905,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } + + syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); + syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + + syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } SSyncNode* syncNodeAcquire(int64_t rid) { @@ -1245,7 +1270,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -1267,7 +1292,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 8aeb9c4856..07a9397a58 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -58,14 +58,15 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { syncMeta.term = pEntry->term; code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); - sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); - } - //assert(code == 0); + } + // assert(code == 0); walFsync(pWal, true); return code; @@ -77,16 +78,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - int32_t code = walReadWithHandle(pWalHandle, index); + int32_t code = walReadWithHandle(pWalHandle, index); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); - sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); - } - //assert(walReadWithHandle(pWalHandle, index) == 0); + } + // assert(walReadWithHandle(pWalHandle, index) == 0); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); assert(pEntry != NULL); @@ -112,16 +114,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - //assert(walRollback(pWal, fromIndex) == 0); + // assert(walRollback(pWal, fromIndex) == 0); int32_t code = walRollback(pWal, fromIndex); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); - sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); - } + } return 0; // to avoid compiler error } @@ -145,16 +148,16 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - //assert(walCommit(pWal, index) == 0); + // assert(walCommit(pWal, index) == 0); int32_t code = walCommit(pWal, index); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); ASSERT(0); - } + } return 0; // to avoid compiler error } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 8afe9ff2a7..cfbdf0e961 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -37,6 +37,7 @@ add_executable(syncRaftCfgTest "") add_executable(syncRespMgrTest "") add_executable(syncSnapshotTest "") add_executable(syncApplyMsgTest "") +add_executable(syncConfigChangeTest "") target_sources(syncTest @@ -195,6 +196,10 @@ target_sources(syncApplyMsgTest PRIVATE "syncApplyMsgTest.cpp" ) +target_sources(syncConfigChangeTest + PRIVATE + "syncConfigChangeTest.cpp" +) target_include_directories(syncTest @@ -392,6 +397,11 @@ target_include_directories(syncApplyMsgTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncConfigChangeTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -550,6 +560,10 @@ target_link_libraries(syncApplyMsgTest sync gtest_main ) +target_link_libraries(syncConfigChangeTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp new file mode 100644 index 0000000000..9a2d9a6b34 --- /dev/null +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -0,0 +1,259 @@ +#include +#include +#include "os.h" +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncUtil.h" +#include "wal.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t gPorts[] = {7010, 7110, 7210, 7310, 7410}; +const char* gDir = "./syncReplicateTest"; +int32_t gVgId = 1234; +SyncIndex gSnapshotLastApplyIndex; + +void init() { + int code = walInit(); + assert(code == 0); + + code = syncInit(); + assert(code == 0); + + sprintf(tsTempDir, "%s", "."); +} + +void cleanup() { walCleanUp(); } + +void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + if (pFsm->FpGetSnapshot != NULL) { + SSnapshot snapshot; + pFsm->FpGetSnapshot(pFsm, &snapshot); + beginIndex = snapshot.lastApplyIndex; + } + + if (cbMeta.index > beginIndex) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); + } else { + sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); + } +} + +void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, + cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); +} + +void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); +} + +int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { + pSnapshot->data = NULL; + pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex; + pSnapshot->lastApplyTerm = 100; + return 0; +} + +SSyncFSM* createFsm() { + SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; + pFsm->FpGetSnapshot = GetSnapshotCb; + return pFsm; +} + +SWal* createWal(char* path, int32_t vgId) { + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + SWal* pWal = walOpen(path, &walCfg); + assert(pWal != NULL); + return pWal; +} + +int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) { + SSyncInfo syncInfo; + syncInfo.vgId = vgId; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = createFsm(); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + + if (isStandBy) { + pCfg->myIndex = 0; + pCfg->replicaNum = 1; + pCfg->nodeInfo[0].nodePort = gPorts[myIndex]; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + } else { + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = gPorts[i]; + taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn); + // snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + } + } + + int64_t rid = syncOpen(&syncInfo); + assert(rid > 0); + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + syncNodeRelease(pSyncNode); + + return rid; +} + +void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) { + SSyncCfg syncCfg; + + syncCfg.myIndex = myIndex; + syncCfg.replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + syncCfg.nodeInfo[i].nodePort = gPorts[i]; + taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn); + } + + syncReconfig(rid, &syncCfg); +} + +void usage(char* exe) { + printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe); +} + +SRpcMsg* createRpcMsg(int i, int count, int myIndex) { + SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 256; + pMsg->pCont = rpcMallocCont(pMsg->contLen); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); + return pMsg; +} + +int main(int argc, char** argv) { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + if (argc != 7) { + usage(argv[0]); + exit(-1); + } + + int32_t replicaNum = atoi(argv[1]); + int32_t myIndex = atoi(argv[2]); + int32_t lastApplyIndex = atoi(argv[3]); + int32_t writeRecordNum = atoi(argv[4]); + bool isStandBy = atoi(argv[5]); + bool isConfigChange = atoi(argv[6]); + gSnapshotLastApplyIndex = lastApplyIndex; + + if (!isStandBy) { + assert(replicaNum >= 1 && replicaNum <= 5); + assert(myIndex >= 0 && myIndex < replicaNum); + assert(lastApplyIndex >= -1); + assert(writeRecordNum >= 0); + } + + init(); + int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); + assert(ret == 0); + + char walPath[128]; + snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex); + SWal* pWal = createWal(walPath, gVgId); + + int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy); + assert(rid > 0); + + if (isStandBy) { + syncStartStandBy(rid); + } else { + syncStart(rid); + } + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + if (isConfigChange) { + configChange(rid, 3, myIndex); + } + + //--------------------------- + int32_t alreadySend = 0; + while (1) { + char* s = syncNode2SimpleStr(pSyncNode); + + if (alreadySend < writeRecordNum) { + SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); + int32_t ret = syncPropose(rid, pRpcMsg, false); + if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + sTrace("%s value%d write not leader", s, alreadySend); + } else { + assert(ret == 0); + sTrace("%s value%d write ok", s, alreadySend); + } + alreadySend++; + + rpcFreeCont(pRpcMsg->pCont); + taosMemoryFree(pRpcMsg); + } else { + sTrace("%s", s); + } + + taosMsleep(1000); + taosMemoryFree(s); + taosMsleep(1000); + } + + syncNodeRelease(pSyncNode); + syncStop(rid); + walClose(pWal); + syncIOStop(); + cleanup(); + return 0; +} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8a2a60d3e2..b5e64242e4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,6 +322,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") + // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")