diff --git a/include/common/systable.h b/include/common/systable.h index bd8aae998f..506bdcfa8a 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -23,7 +23,6 @@ extern "C" { #define TDENGINE_SYSTABLE_H #define TSDB_INFORMATION_SCHEMA_DB "information_schema" -#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_INS_TABLE_DNODES "dnodes" #define TSDB_INS_TABLE_MNODES "mnodes" #define TSDB_INS_TABLE_MODULES "modules" @@ -44,27 +43,27 @@ extern "C" { #define TSDB_INS_TABLE_VNODES "vnodes" #define TSDB_INS_TABLE_CONFIGS "configs" -#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" -#define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" -#define TSDB_PERFS_TABLE_CONNECTIONS "connections" -#define TSDB_PERFS_TABLE_QUERIES "queries" -#define TSDB_PERFS_TABLE_TOPICS "topics" -#define TSDB_PERFS_TABLE_CONSUMERS "consumers" -#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" -#define TSDB_PERFS_TABLE_OFFSETS "offsets" -#define TSDB_PERFS_TABLE_TRANS "trans" -#define TSDB_PERFS_TABLE_STREAMS "streams" +#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" +#define TSDB_PERFS_TABLE_SMAS "smas" +#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" +#define TSDB_PERFS_TABLE_CONNECTIONS "connections" +#define TSDB_PERFS_TABLE_QUERIES "queries" +#define TSDB_PERFS_TABLE_TOPICS "topics" +#define TSDB_PERFS_TABLE_CONSUMERS "consumers" +#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" +#define TSDB_PERFS_TABLE_OFFSETS "offsets" +#define TSDB_PERFS_TABLE_TRANS "trans" +#define TSDB_PERFS_TABLE_STREAMS "streams" typedef struct SSysDbTableSchema { - const char *name; + const char* name; const int32_t type; const int32_t bytes; } SSysDbTableSchema; typedef struct SSysTableMeta { - const char *name; - const SSysDbTableSchema *schema; + const char* name; + const SSysDbTableSchema* schema; const int32_t colNum; } SSysTableMeta; diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f253d31963..da5158abb5 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -17,8 +17,8 @@ #define _TD_COMMON_GLOBAL_H_ #include "tarray.h" -#include "tdef.h" #include "tconfig.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -121,15 +121,16 @@ extern char tsCompressor[]; extern int32_t tsDiskCfgNum; extern SDiskCfg tsDiskCfg[]; -// internal -extern int32_t tsTransPullupMs; -extern int32_t tsMaRebalanceMs; +// internal +extern int32_t tsTransPullupInterval; +extern int32_t tsMqRebalanceInterval; #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) -int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, - char *apolloUrl, SArray *pArgs, bool tsc); -int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); +int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, + const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); +int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, + bool tsc); void taosCleanupCfg(); void taosCfgDynamicOptions(const char *option, const char *value); void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 36bef5e85a..8e918c40f9 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -204,7 +204,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) - // sync integration TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4e843aeb59..6a57b8d53e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -411,7 +411,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) -// sync integration #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 51f924280a..fba14bbaf5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -211,6 +211,7 @@ static const SSysDbTableSchema transSchema[] = { {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 87a9c521af..c878109711 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -170,8 +170,8 @@ uint32_t tsCurRange = 100; // range char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR // internal -int32_t tsTransPullupMs = 6000; -int32_t tsMaRebalanceMs = 2000; +int32_t tsTransPullupInterval = 6; +int32_t tsMqRebalanceInterval = 2; void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); @@ -438,6 +438,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; + return 0; } @@ -575,6 +578,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; + tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; + tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; + if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index a55cf41262..e522be0629 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -335,7 +335,6 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { } bool roleChanged = false; for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { - // sync integration if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) { if (pVgroup->vnodeGid[vg].role != pVload->syncState) { roleChanged = true; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3d4254da03..5085de8610 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1405,15 +1405,18 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); + STR_WITH_MAXSIZE_TO_VARSTR(type, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)type, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes); + STR_WITH_MAXSIZE_TO_VARSTR(lastError, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)lastError, false); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index e2814e95f0..690399f099 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -65,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } - taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer); + taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer); } static void mndCalMqRebalance(void *param, void *tmrId) { @@ -81,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } - taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer); + taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer); } static void mndPullupTelem(void *param, void *tmrId) { @@ -103,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } - if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) { + if (taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) { + if (taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index b3cbcb6898..974c86b423 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -58,7 +58,7 @@ class MndTestTrans2 : public ::testing::Test { strcpy(opt.replicas[0].fqdn, "localhost"); opt.msgCb = msgCb; - tsTransPullupMs = 1000; + tsTransPullupInterval = 1; const char *mnodepath = "/tmp/mnode_test_trans"; taosRemoveDir(mnodepath); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 8850b35397..ebf49c644b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -157,7 +157,7 @@ struct SVnodeCfg { bool isWeak; STsdbCfg tsdbCfg; SWalCfg walCfg; - SSyncCfg syncCfg; // sync integration + SSyncCfg syncCfg; uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 5e21abb404..32866d7469 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -97,7 +97,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; - // sync integration if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; SJson *pNodeInfoArr = tjsonCreateArray(); @@ -157,7 +156,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; - // sync integration if (tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 7e80eacf8f..3e869650bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -124,8 +124,7 @@ _exit: int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); - // pLoad->syncState = TAOS_SYNC_STATE_LEADER; - pLoad->syncState = syncGetMyRole(pVnode->sync); // sync integration + pLoad->syncState = syncGetMyRole(pVnode->sync); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; pLoad->totalStorage = 300; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8f7c11436d..ee6bdc1a59 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -198,7 +198,6 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); } -// sync integration int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 26393394e9..1260f9a3e7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -14,12 +14,6 @@ */ #include "vnd.h" -// #include "sync.h" -// #include "syncTools.h" -// #include "tmsgcb.h" -// #include "vnodeInt.h" - -// sync integration int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { SSyncInfo syncInfo; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 8cce169d99..f2b9e0caab 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -62,6 +62,8 @@ # ---- tstream ./test.sh -f tsim/tstream/basic0.sim +# ---- transaction + ./test.sh -f tsim/trans/create_db.sim # ---- tmq ./test.sh -f tsim/tmq/basic1.sim diff --git a/tests/script/tsim/trans/create_db.sim b/tests/script/tsim/trans/create_db.sim new file mode 100644 index 0000000000..0db5add88a --- /dev/null +++ b/tests/script/tsim/trans/create_db.sim @@ -0,0 +1,166 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sql connect + +print =============== show dnodes +sql show dnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +sql show mnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data02 != LEADER then + return -1 +endi + +print =============== create dnodes +sql create dnode $hostname port 7200 +sleep 2000 + +sql show dnodes; +if $rows != 2 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data10 != 2 then + return -1 +endi + +print =============== kill dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print =============== create database +sql show transactions +if $rows != 0 then + return -1 +endi + +sql_error create database d1 vgroups 2; + +print =============== show transactions +sql show transactions +if $rows != 1 then + return -1 +endi + +if $data[0][0] != 2 then + return -1 +endi + +if $data[0][2] != undoAction then + return -1 +endi + +if $data[0][3] != d1 then + return -1 +endi + +if $data[0][4] != create-db then + return -1 +endi + +if $data[0][7] != @Unable to establish connection@ then + return -1 +endi + +sql_error create database d1 vgroups 2; + +print =============== start dnode2 +system sh/exec.sh -n dnode2 -s start +sleep 3000 + +sql show transactions +if $rows != 0 then + return -1 +endi + +sql create database d1 vgroups 2; + +print =============== kill dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print =============== create database +sql show transactions +if $rows != 0 then + return -1 +endi + +sql_error create database d2 vgroups 2; + +print =============== show transactions +sql show transactions +if $rows != 1 then + return -1 +endi + +if $data[0][0] != 4 then + return -1 +endi + +if $data[0][2] != undoAction then + return -1 +endi + +if $data[0][3] != d2 then + return -1 +endi + +if $data[0][4] != create-db then + return -1 +endi + +if $data[0][7] != @Unable to establish connection@ then + return -1 +endi + +sql_error create database d2 vgroups 2; + +print =============== kill transaction +sql kill transaction 4; +sleep 2000 + +sql show transactions +if $rows != 0 then + return -1 +endi + +print =============== start dnode2 +system sh/exec.sh -n dnode2 -s start +sleep 3000 + +sql show transactions +if $rows != 0 then + return -1 +endi + +sql create database d2 vgroups 2; +sql_error kill transaction 1; +sql_error kill transaction 2; +sql_error kill transaction 3; +sql_error kill transaction 4; +sql_error kill transaction 5; + +return +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file