From dd4333b14058cc378d0070a5c21659defdd2f64a Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 11 Nov 2022 14:11:42 +0800 Subject: [PATCH 01/12] enh: optimize the error message when illegally modifying the column length --- source/libs/parser/src/parUtil.c | 2 +- source/util/src/terror.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 00a72a1946..e8c3f2fa8d 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY: return "Primary timestamp column cannot be dropped"; case TSDB_CODE_PAR_INVALID_MODIFY_COL: - return "Only binary/nchar column length could be modified"; + return "Only binary/nchar column length could be modified, and the length can only be increased, not decreased"; case TSDB_CODE_PAR_INVALID_TBNAME: return "Invalid tbname pseudo column"; case TSDB_CODE_PAR_INVALID_FUNCTION_NAME: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0e6568d692..a1162d2e94 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -521,7 +521,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PASSWD, "Invalid password") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid alter table statement") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified, and the length can only be increased, not decreased") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TBNAME, "Invalid tbname pseudo column") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FUNCTION_NAME, "Invalid function name") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COMMENT_TOO_LONG, "Comment too long") From b4f8e45401bcdb2ab8f82b910a7ee1c1b0d6e803 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Nov 2022 14:20:46 +0800 Subject: [PATCH 02/12] refact: adjust head file and sync log --- source/libs/sync/inc/syncAppendEntries.h | 1 - source/libs/sync/inc/syncAppendEntriesReply.h | 1 - source/libs/sync/inc/syncIndexMgr.h | 21 ++--- source/libs/sync/inc/syncMessage.h | 41 -------- source/libs/sync/src/syncAppendEntries.c | 7 +- source/libs/sync/src/syncAppendEntriesReply.c | 6 +- source/libs/sync/src/syncCommit.c | 3 +- source/libs/sync/src/syncElection.c | 1 - source/libs/sync/src/syncEnv.c | 2 + source/libs/sync/src/syncIndexMgr.c | 83 +---------------- .../test/syncConfigChangeSnapshotTest.cpp | 2 +- .../sync/test/sync_test_lib/inc/syncTest.h | 13 +++ .../sync_test_lib/src/syncIndexMgrDebug.c | 93 +++++++++++++++++++ 13 files changed, 120 insertions(+), 154 deletions(-) delete mode 100644 source/libs/sync/inc/syncMessage.h create mode 100644 source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 0a67939d35..a87a28baf5 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "syncInt.h" -#include "syncMessage.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index d2dff92d43..09750864d5 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "syncInt.h" -#include "syncMessage.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index bd88f5cdce..79b4fa0fbf 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -41,22 +41,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); -void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); -int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); -int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); - -// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); -// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); - -// for debug ------------------- -void syncIndexMgrPrint(SSyncIndexMgr *pObj); -void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj); -void syncIndexMgrLog(SSyncIndexMgr *pObj); -void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj); +void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); +int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); +int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); +SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h deleted file mode 100644 index 936081c7b2..0000000000 --- a/source/libs/sync/inc/syncMessage.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_MESSAGE_H -#define _TD_LIBS_SYNC_MESSAGE_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "syncInt.h" - -// --------------------------------------------- -cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); -cJSON* syncRpcUnknownMsg2Json(); -char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); - -// for debug ---------------------- -void syncRpcMsgPrint(SRpcMsg* pMsg); -void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); -void syncRpcMsgLog(SRpcMsg* pMsg); -void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); -// --------------------------------------------- - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_MESSAGE_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 792ce67cd4..2dbe157c16 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -13,15 +13,10 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncAppendEntries.h" -#include "syncInt.h" -#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" -#include "syncSnapshot.h" -#include "syncUtil.h" -#include "syncVoteMgr.h" -#include "wal.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index cf7c391a1d..89661987a8 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -13,17 +13,13 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncAppendEntriesReply.h" #include "syncCommit.h" #include "syncIndexMgr.h" -#include "syncInt.h" -#include "syncRaftCfg.h" -#include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncReplication.h" #include "syncSnapshot.h" -#include "syncUtil.h" -#include "syncVoteMgr.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a951b78e1e..d2320fc6be 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -13,10 +13,9 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncCommit.h" #include "syncIndexMgr.h" -#include "syncInt.h" -#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 85e4572877..95f7a638f6 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -15,7 +15,6 @@ #define _DEFAULT_SOURCE #include "syncElection.h" -#include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncVoteMgr.h" diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 3f3b794f46..a0e0a5a2c2 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -105,6 +105,7 @@ void syncEnvStopTimer() { #endif static void syncEnvTick(void *param, void *tmrId) { +#if 0 SSyncEnv *pSyncEnv = param; if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) { gSyncEnv.envTickTimerCounter++; @@ -121,4 +122,5 @@ static void syncEnvTick(void *param, void *tmrId) { gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, gSyncEnv.envTickTimerMS, tmrId); } +#endif } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 8e78aeedc3..ca5e531528 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -13,18 +13,16 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncIndexMgr.h" #include "syncUtil.h" -// SMatchIndex ----------------------------- - SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { - SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); + SSyncIndexMgr *pSyncIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr)); if (pSyncIndexMgr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); pSyncIndexMgr->replicas = &(pSyncNode->replicasId); pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; @@ -97,54 +95,6 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf return SYNC_INDEX_INVALID; } -cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { - char u64buf[128] = {0}; - cJSON *pRoot = cJSON_CreateObject(); - - if (pSyncIndexMgr != NULL) { - cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum); - cJSON *pReplicas = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "replicas", pReplicas); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i])); - } - - { - int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - arr[i] = pSyncIndexMgr->index[i]; - } - cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); - taosMemoryFree(arr); - cJSON_AddItemToObject(pRoot, "index", pIndex); - } - - { - int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); - for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { - arr[i] = pSyncIndexMgr->privateTerm[i]; - } - cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); - taosMemoryFree(arr); - cJSON_AddItemToObject(pRoot, "privateTerm", pIndex); - } - - snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode); - cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot); - return pJson; -} - -char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { - cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) { for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { @@ -201,35 +151,6 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa return -1; } -// for debug ------------------- -void syncIndexMgrPrint(SSyncIndexMgr *pObj) { - char *serialized = syncIndexMgr2Str(pObj); - printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) { - char *serialized = syncIndexMgr2Str(pObj); - printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncIndexMgrLog(SSyncIndexMgr *pObj) { - char *serialized = syncIndexMgr2Str(pObj); - sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) { - if (gRaftDetailLog) { - char *serialized = syncIndexMgr2Str(pObj); - sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) { for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index fb227dfad3..c29def9ca3 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -143,7 +143,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 + sTrace("==callback== ==ReConfigCb== flag:%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term); } diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 50d9d12eee..ff4535ec87 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -110,7 +110,20 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender); cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver); char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver); +cJSON* syncIndexMgr2Json(SSyncIndexMgr* pSyncIndexMgr); +char* syncIndexMgr2Str(SSyncIndexMgr* pSyncIndexMgr); +void syncIndexMgrPrint(SSyncIndexMgr* pObj); +void syncIndexMgrPrint2(char* s, SSyncIndexMgr* pObj); +void syncIndexMgrLog(SSyncIndexMgr* pObj); +void syncIndexMgrLog2(char* s, SSyncIndexMgr* pObj); +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); +cJSON* syncRpcUnknownMsg2Json(); +char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); +void syncRpcMsgPrint(SRpcMsg* pMsg); +void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); +void syncRpcMsgLog(SRpcMsg* pMsg); +void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c b/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c new file mode 100644 index 0000000000..1d3198c51d --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" + +void syncIndexMgrPrint(SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncIndexMgrLog(SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) { + if (gRaftDetailLog) { + char *serialized = syncIndexMgr2Str(pObj); + sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + +cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { + char u64buf[128] = {0}; + cJSON *pRoot = cJSON_CreateObject(); + + if (pSyncIndexMgr != NULL) { + cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum); + cJSON *pReplicas = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "replicas", pReplicas); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i])); + } + + { + int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + arr[i] = pSyncIndexMgr->index[i]; + } + cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); + taosMemoryFree(arr); + cJSON_AddItemToObject(pRoot, "index", pIndex); + } + + { + int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum); + for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { + arr[i] = pSyncIndexMgr->privateTerm[i]; + } + cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum); + taosMemoryFree(arr); + cJSON_AddItemToObject(pRoot, "privateTerm", pIndex); + } + + snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot); + return pJson; +} + +char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { + cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} From fd152f00ece7b192a9c826f799f3085ca95c3461 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Nov 2022 14:35:16 +0800 Subject: [PATCH 03/12] refact: adjust head file and sync log --- source/libs/sync/inc/syncInt.h | 25 ++++++++++++++++++- .../sync/inc/{syncTools.h => syncMessage.h} | 22 +++------------- source/libs/sync/src/syncAppendEntries.c | 1 + source/libs/sync/src/syncAppendEntriesReply.c | 1 + source/libs/sync/src/syncElection.c | 3 ++- source/libs/sync/src/syncMessage.c | 1 + source/libs/sync/src/syncRequestVote.c | 1 + source/libs/sync/src/syncRequestVoteReply.c | 1 + source/libs/sync/src/syncVoteMgr.c | 1 + 9 files changed, 36 insertions(+), 20 deletions(-) rename source/libs/sync/inc/{syncTools.h => syncMessage.h} (97%) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a951ba38d..46bbb14421 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,6 @@ extern "C" { #endif #include "sync.h" -#include "syncTools.h" #include "taosdef.h" #include "tlog.h" #include "trpc.h" @@ -85,9 +84,33 @@ typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncTimer SSyncTimer; typedef struct SSyncHbTimerData SSyncHbTimerData; +typedef struct SyncSnapshotSend SyncSnapshotSend; +typedef struct SyncSnapshotRsp SyncSnapshotRsp; +typedef struct SyncLocalCmd SyncLocalCmd; +typedef struct SyncAppendEntriesBatch SyncAppendEntriesBatch; +typedef struct SyncPreSnapshotReply SyncPreSnapshotReply; +typedef struct SyncHeartbeatReply SyncHeartbeatReply; +typedef struct SyncHeartbeat SyncHeartbeat; +typedef struct SyncPreSnapshot SyncPreSnapshot; + +typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); +typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); +typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); +typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); +typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); +typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); +typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); +typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); +typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); extern bool gRaftDetailLog; +typedef struct SRaftId { + SyncNodeId addr; + SyncGroupId vgId; +} SRaftId; + typedef struct SSyncHbTimerData { SSyncNode* pSyncNode; SSyncTimer* pTimer; diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncMessage.h similarity index 97% rename from source/libs/sync/inc/syncTools.h rename to source/libs/sync/inc/syncMessage.h index 3fb4a5ba0c..93a674e170 100644 --- a/source/libs/sync/inc/syncTools.h +++ b/source/libs/sync/inc/syncMessage.h @@ -13,18 +13,14 @@ * along with this program. If not, see . */ -#ifndef _TD_LIBS_SYNC_TOOLS_H -#define _TD_LIBS_SYNC_TOOLS_H +#ifndef _TD_LIBS_SYNC_MESSAGE_H +#define _TD_LIBS_SYNC_MESSAGE_H #ifdef __cplusplus extern "C" { #endif -// ------------------ ds ------------------- -typedef struct SRaftId { - SyncNodeId addr; - SyncGroupId vgId; -} SRaftId; +#include "syncInt.h" // ------------------ for debug ------------------- void syncRpcMsgPrint(SRpcMsg* pMsg); @@ -731,16 +727,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); // ----------------------------------------- -typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); -typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); -typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); -typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); -typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); -typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); -typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); -typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg); -typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); -typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); // option ---------------------------------- bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); @@ -752,4 +738,4 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); } #endif -#endif /*_TD_LIBS_SYNC_TOOLS_H*/ +#endif /*_TD_LIBS_SYNC_MESSAGE_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 2dbe157c16..c9c1baa4bc 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncAppendEntries.h" +#include "syncMessage.h" #include "syncRaftLog.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 89661987a8..53d6b5d92f 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncAppendEntriesReply.h" +#include "syncMessage.h" #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncRaftStore.h" diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 95f7a638f6..123ce5b581 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -15,10 +15,11 @@ #define _DEFAULT_SOURCE #include "syncElection.h" +#include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" -#include "syncVoteMgr.h" #include "syncUtil.h" +#include "syncVoteMgr.h" // TLA+ Spec // RequestVote(i, j) == diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3fcb563f3b..e3434aba73 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftEntry.h" diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index f2b75def6b..bf44341acd 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncRequestVote.h" +#include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncUtil.h" diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 02b9bb40ac..1acf16507a 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncRequestVoteReply.h" +#include "syncMessage.h" #include "syncRaftStore.h" #include "syncVoteMgr.h" diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 4ca4e26bec..8a0a35ce33 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncVoteMgr.h" +#include "syncMessage.h" #include "syncUtil.h" static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { From 57f1b593e4c0184c77778c7df9e9e7cec4ee2abe Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Nov 2022 14:40:21 +0800 Subject: [PATCH 04/12] refact: adjust head file and sync log --- include/libs/sync/sync.h | 1 - source/libs/sync/inc/syncMessage.h | 34 ------------------------------ 2 files changed, 35 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a27be95049..0b997690a1 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -58,7 +58,6 @@ typedef int64_t SyncIndex; typedef uint64_t SyncTerm; typedef struct SSyncNode SSyncNode; -typedef struct SSyncBuffer SSyncBuffer; typedef struct SWal SWal; typedef struct SSyncRaftEntry SSyncRaftEntry; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 93a674e170..77dfdf282d 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -22,40 +22,6 @@ extern "C" { #include "syncInt.h" -// ------------------ for debug ------------------- -void syncRpcMsgPrint(SRpcMsg* pMsg); -void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); -void syncRpcMsgLog(SRpcMsg* pMsg); -void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); - -// ------------------ for compile ------------------- -typedef struct SSyncBuffer { - void* data; - size_t len; -} SSyncBuffer; - -typedef struct SNodesRole { - int32_t replicaNum; - SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; - ESyncState role[TSDB_MAX_REPLICA]; -} SNodesRole; - -typedef struct SStateMgr { - void* data; - - int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm); - int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm); - - int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor); - int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor); - - int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg); - int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg); - -} SStateMgr; - -// ------------------ for message process ------------------- - // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; From 603cc02671df200b81d142ecb8f6ed1dbcdb5dc4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 11 Nov 2022 14:37:31 +0800 Subject: [PATCH 05/12] refactor(tmq): add more strict drop condition check --- include/common/tmsg.h | 21 +++++++++------ source/dnode/mnode/impl/src/mndTopic.c | 37 +++++++++++++++++++------- source/dnode/vnode/src/tq/tq.c | 6 ++--- 3 files changed, 44 insertions(+), 20 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b16b5a2d4b..99c5c72e2f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->version); - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - return NULL; - } + if (pSW->nCols > 0) { + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } - for (int32_t i = 0; i < pSW->nCols; i++) { - buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + for (int32_t i = 0; i < pSW->nCols; i++) { + buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + } + } else { + pSW->pSchema = NULL; } return (void*)buf; } @@ -839,7 +843,7 @@ typedef struct { int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT - int64_t stateTs; // ms + int64_t stateTs; // ms } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e1ca1d2708..522036afa2 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (pIter == NULL) break; if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); @@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } } + + sz = taosArrayGetSize(pConsumer->rebNewTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->rebNewTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + + sz = taosArrayGetSize(pConsumer->rebRemovedTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + sdbRelease(pSdb, pConsumer); } @@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); -#if 0 - if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { - ASSERT(0); - mndTransDrop(pTrans); - mndReleaseTopic(pMnode, pTopic); - return -1; - } -#endif - // TODO check if rebalancing if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { /*ASSERT(0);*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9c377fe7f5..f75dce8231 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%" PRId64 - ", version:%" PRId64 "", + tqDebug("tmq poll: consumer %" PRId64 + ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, - dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); tDeleteSMqDataRsp(&dataRsp); return code; From 1336ed11b28dc623f20189c56407c489f7e82f9c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Nov 2022 15:06:47 +0800 Subject: [PATCH 06/12] refact: adjust sync ping message handler --- source/libs/sync/inc/syncMessage.h | 17 +- source/libs/sync/src/syncMessage.c | 236 ----------------- .../sync/test/sync_test_lib/inc/syncTest.h | 17 ++ .../test/sync_test_lib/src/syncMessageDebug.c | 238 ++++++++++++++++++ 4 files changed, 256 insertions(+), 252 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 77dfdf282d..7dc04d0b86 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -34,27 +34,12 @@ typedef struct SyncPing { char data[]; } SyncPing; -SyncPing* syncPingBuild(uint32_t dataLen); -SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); -SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); + void syncPingDestroy(SyncPing* pMsg); void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); -char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len); SyncPing* syncPingDeserialize2(const char* buf, uint32_t len); -int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen); -SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen); -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncPing2Json(const SyncPing* pMsg); -char* syncPing2Str(const SyncPing* pMsg); - -// for debug ---------------------- -void syncPingPrint(const SyncPing* pMsg); -void syncPingPrint2(char* s, const SyncPing* pMsg); -void syncPingLog(const SyncPing* pMsg); -void syncPingLog2(char* s, const SyncPing* pMsg); // --------------------------------------------- typedef struct SyncPingReply { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index e3434aba73..565b21eb55 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -151,31 +151,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { } } -// ---- message process SyncPing---- -SyncPing* syncPingBuild(uint32_t dataLen) { - uint32_t bytes = sizeof(SyncPing) + dataLen; - SyncPing* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->msgType = TDMT_SYNC_PING; - pMsg->dataLen = dataLen; - return pMsg; -} - -SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) { - uint32_t dataLen = strlen(str) + 1; - SyncPing* pMsg = syncPingBuild(dataLen); - pMsg->vgId = vgId; - pMsg->srcId = *srcId; - pMsg->destId = *destId; - snprintf(pMsg->data, pMsg->dataLen, "%s", str); - return pMsg; -} - -SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) { - SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping"); - return pMsg; -} void syncPingDestroy(SyncPing* pMsg) { if (pMsg != NULL) { @@ -194,16 +169,6 @@ void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); } -char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncPingSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncPing* pMsg = taosMemoryMalloc(bytes); @@ -213,117 +178,6 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { return pMsg; } -int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) { - return -1; - } - - if (tEncodeU32(&encoder, pMsg->bytes) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->vgId) < 0) { - return -1; - } - if (tEncodeU32(&encoder, pMsg->msgType) < 0) { - return -1; - } - if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) { - return -1; - } - if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) { - return -1; - } - if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) { - return -1; - } - if (tEncodeU32(&encoder, pMsg->dataLen) < 0) { - return -1; - } - if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) { - return -1; - } - - tEndEncode(&encoder); - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) { - return NULL; - } - - SyncPing* pMsg = NULL; - uint32_t bytes; - if (tDecodeU32(&decoder, &bytes) < 0) { - return NULL; - } - - pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - pMsg->bytes = bytes; - - if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - uint32_t len; - char* data = NULL; - if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { - taosMemoryFree(pMsg); - return NULL; - } - ASSERT(len == pMsg->dataLen); - memcpy(pMsg->data, data, len); - - tEndDecode(&decoder); - tDecoderClear(&decoder); - return pMsg; -} - -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { - syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); @@ -331,96 +185,6 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { return pMsg; } -cJSON* syncPing2Json(const SyncPing* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - char* s; - s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", s); - taosMemoryFree(s); - s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data2", s); - taosMemoryFree(s); - } - - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPing", pRoot); - return pJson; -} - -char* syncPing2Str(const SyncPing* pMsg) { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncPingPrint(const SyncPing* pMsg) { - char* serialized = syncPing2Str(pMsg); - printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPingPrint2(char* s, const SyncPing* pMsg) { - char* serialized = syncPing2Str(pMsg); - printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncPingLog(const SyncPing* pMsg) { - char* serialized = syncPing2Str(pMsg); - sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncPingLog2(char* s, const SyncPing* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncPing2Str(pMsg); - sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } -} - // ---- message process SyncPingReply---- SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncPingReply) + dataLen; diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index ff4535ec87..93da2df5a2 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -125,6 +125,23 @@ void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); void syncRpcMsgLog(SRpcMsg* pMsg); void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); + +// origin syncMessage +SyncPing* syncPingBuild(uint32_t dataLen); +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str); +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId); +char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len); +int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen); +SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen); +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); +cJSON* syncPing2Json(const SyncPing* pMsg); +char* syncPing2Str(const SyncPing* pMsg); +void syncPingPrint(const SyncPing* pMsg); +void syncPingPrint2(char* s, const SyncPing* pMsg); +void syncPingLog(const SyncPing* pMsg); +void syncPingLog2(char* s, const SyncPing* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index 012382d69d..cdd2ae045d 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -16,6 +16,244 @@ #define _DEFAULT_SOURCE #include "syncTest.h" +// ---- message process SyncPing---- +SyncPing* syncPingBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncPing) + dataLen; + SyncPing* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = TDMT_SYNC_PING; + pMsg->dataLen = dataLen; + return pMsg; +} + +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPing* pMsg = syncPingBuild(dataLen); + pMsg->vgId = vgId; + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) { + SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping"); + return pMsg; +} + +char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPingSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { + syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + if (tStartDecode(&decoder) < 0) { + return NULL; + } + + SyncPing* pMsg = NULL; + uint32_t bytes; + if (tDecodeU32(&decoder, &bytes) < 0) { + return NULL; + } + + pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + pMsg->bytes = bytes; + + if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + uint32_t len; + char* data = NULL; + if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); + return NULL; + } + ASSERT(len == pMsg->dataLen); + memcpy(pMsg->data, data, len); + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return pMsg; +} + +int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) { + return -1; + } + + if (tEncodeU32(&encoder, pMsg->bytes) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->vgId) < 0) { + return -1; + } + if (tEncodeU32(&encoder, pMsg->msgType) < 0) { + return -1; + } + if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) { + return -1; + } + if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) { + return -1; + } + if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) { + return -1; + } + if (tEncodeU32(&encoder, pMsg->dataLen) < 0) { + return -1; + } + if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) { + return -1; + } + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +cJSON* syncPing2Json(const SyncPing* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + taosMemoryFree(s); + s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + taosMemoryFree(s); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + +char* syncPing2Str(const SyncPing* pMsg) { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncPingPrint(const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPingPrint2(char* s, const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPingLog(const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPingLog2(char* s, const SyncPing* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPing2Str(pMsg); + sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; From 265bb81b49b7a5013faad078d5c24fb0fda3cdff Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 11 Nov 2022 15:35:17 +0800 Subject: [PATCH 07/12] fix(query): set offset for empty record --- source/libs/executor/src/scanoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 945753cc53..a3dc128ced 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1773,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; } ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); From 7ea790021fb021bf46ed35491a32ba094ec3073b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 11 Nov 2022 16:15:16 +0800 Subject: [PATCH 08/12] fix:avoid duplicate results --- source/libs/executor/src/executorimpl.c | 1 + source/libs/executor/src/timewindowoperator.c | 35 +++- source/libs/stream/src/streamState.c | 10 +- tests/script/tsim/stream/state0.sim | 188 ++++++++++++++++++ 4 files changed, 220 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 34f462cb3d..7fd288cd57 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3423,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ASSERT(code == 0); if (code == -1) { // coverity scan + pGroupResInfo->index += 1; continue; } SResultRow* pRow = (SResultRow*)pVal; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4c369e8802..40742087ea 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); } +static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) { + *pHashKey = *pKey; + pHashKey->win.ekey = pKey->win.skey; +} + static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { if (tSimpleHashGetSize(pHashMap) == 0) { return; @@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { for (int32_t i = 0; i < size; i++) { SSessionKey* pWin = taosArrayGet(pWins, i); if (!pWin) continue; - SSessionKey key = *pWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(pWin, &key); tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); } } @@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) { streamStateSessionDel(pAggSup->pState, pKey); - tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey)); + SSessionKey hashKey = {0}; + getSessionHashKey(pKey, &hashKey); + tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey)); return true; } @@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SSessionKey key = winInfo.sessionWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } @@ -3896,8 +3903,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; - SSessionKey chWinKey = *pWinKey; - chWinKey.win.ekey = chWinKey.win.skey; + SSessionKey chWinKey = {0}; + getSessionHashKey(pWinKey, &chWinKey); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey); SResultRow* pResult = NULL; SResultRow* pChResult = NULL; @@ -3978,8 +3985,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) { for (int32_t i = 0; i < size; i++) { SSessionKey* pWinKey = taosArrayGet(pResWins, i); if (!pWinKey) continue; - SSessionKey winInfo = *pWinKey; - winInfo.win.ekey = winInfo.win.skey; + SSessionKey winInfo = {0}; + getSessionHashKey(pWinKey, &winInfo); tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0); } } @@ -4561,8 +4568,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SSessionKey key = curWin.winInfo.sessionWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); } } @@ -4645,6 +4652,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); +#if 0 + char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState); + qDebug("===stream===final session%s", pBuf); + taosMemoryFree(pBuf); +#endif + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single state delete"); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ccb0dd4a92..88c39c1157 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa void* tmp = NULL; int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { - *key = resKey; - *pVal = tdbRealloc(NULL, *pVLen); - memcpy(*pVal, tmp, *pVLen); + if (key->win.skey != resKey.win.skey) { + code = -1; + } else { + *key = resKey; + *pVal = tdbRealloc(NULL, *pVLen); + memcpy(*pVal, tmp, *pVLen); + } } streamStateFreeCur(pCur); return code; diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index dc7d9bc407..87d7d4b7fc 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -544,4 +544,192 @@ if $rows != 10 then endi +sql drop stream if exists streams4; +sql drop database if exists test4; +sql drop stable if exists streamt4; +sql create database if not exists test4 vgroups 10 precision "ms" ; +sql use test4; +sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ; +sql create table t1 using st tags (-81) ; +sql create table t2 using st tags (-81) ; +sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1); + +sql insert into t1 (ts, c1) values (1668073288209, 11); +sql insert into t1 (ts, c1) values (1668073288210, 11); +sql insert into t1 (ts, c1) values (1668073288211, 11); +sql insert into t1 (ts, c1) values (1668073288212, 11); +sql insert into t1 (ts, c1) values (1668073288213, 11); +sql insert into t1 (ts, c1) values (1668073288214, 11); +sql insert into t1 (ts, c1) values (1668073288215, 29); + +$loop_count = 0 +loop7: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop7 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop7 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop7 +endi + +sql delete from t1 where ts = cast(1668073288214 as timestamp); +sql insert into t1 (ts, c1) values (1668073288216, 29); +sql delete from t1 where ts = cast(1668073288215 as timestamp); +sql insert into t1 (ts, c1) values (1668073288217, 29); +sql delete from t1 where ts = cast(1668073288216 as timestamp); +sql insert into t1 (ts, c1) values (1668073288218, 29); +sql delete from t1 where ts = cast(1668073288217 as timestamp); +sql insert into t1 (ts, c1) values (1668073288219, 29); +sql delete from t1 where ts = cast(1668073288218 as timestamp); +sql insert into t1 (ts, c1) values (1668073288220, 29); +sql delete from t1 where ts = cast(1668073288219 as timestamp); + +$loop_count = 0 +loop8: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 (ts, c1) values (1668073288221, 65); +sql insert into t1 (ts, c1) values (1668073288222, 65); +sql insert into t1 (ts, c1) values (1668073288223, 65); +sql insert into t1 (ts, c1) values (1668073288224, 65); +sql insert into t1 (ts, c1) values (1668073288225, 65); +sql insert into t1 (ts, c1) values (1668073288226, 65); + +$loop_count = 0 +loop8: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 29 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop8 +endi + +sql insert into t1 (ts, c1) values (1668073288224, 64); + +$loop_count = 0 +loop9: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 4 then + print =====rows=$rows + goto loop9 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop9 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop9 +endi + +if $data11 != 29 then + print =====data11=$data11 + goto loop9 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop9 +endi + +if $data21 != 65 then + print =====data21=$data21 + goto loop9 +endi + +if $data22 != 3 then + print =====data22=$data22 + goto loop9 +endi + +if $data31 != 64 then + print =====data31=$data31 + goto loop9 +endi + +if $data32 != 1 then + print =====data32=$data32 + goto loop9 +endi + + system sh/exec.sh -n dnode1 -s stop -x SIGINT From e8e1d64a9d16c68610c2f8b63174909aacfd9105 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Fri, 11 Nov 2022 16:28:02 +0800 Subject: [PATCH 09/12] Update 10-cpp.mdx --- docs/zh/08-connector/10-cpp.mdx | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/zh/08-connector/10-cpp.mdx b/docs/zh/08-connector/10-cpp.mdx index cc7991da74..8a4f4946a7 100644 --- a/docs/zh/08-connector/10-cpp.mdx +++ b/docs/zh/08-connector/10-cpp.mdx @@ -115,6 +115,7 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) 订阅和消费 ```c + {{#include examples/c/tmq.c}} ``` From 0ee7ccaf71581ffe615696c6918eca2d926cf58b Mon Sep 17 00:00:00 2001 From: Pan YANG Date: Fri, 11 Nov 2022 16:30:39 +0800 Subject: [PATCH 10/12] fix: import image before using in md/mdx file --- docs/zh/05-get-started/index.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md index 33661580ee..dec4d800bc 100644 --- a/docs/zh/05-get-started/index.md +++ b/docs/zh/05-get-started/index.md @@ -3,6 +3,8 @@ title: 立即开始 description: '快速设置 TDengine 环境并体验其高效写入和查询' --- +import xiaot from './tdengine.webp' + TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。 本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 @@ -18,4 +20,4 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; 微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。 - +小 T 的二维码 From 9aeac303598e96451aad6bc49b840680165d66fa Mon Sep 17 00:00:00 2001 From: Pan YANG Date: Fri, 11 Nov 2022 17:25:57 +0800 Subject: [PATCH 11/12] docs: optimize search result of download --- docs/zh/28-releases/01-tdengine.md | 2 ++ docs/zh/28-releases/02-tools.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 31093ce557..83ca096d32 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -4,6 +4,8 @@ title: TDengine 发布历史 description: TDengine 发布历史、Release Notes 及下载链接 --- +各版本 TDengine 安装包下载链接如下: + import Release from "/components/ReleaseV3"; ## 3.0.1.6 diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index 2623391fb9..2152dede93 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -4,6 +4,8 @@ title: taosTools 发布历史 description: taosTools 的发布历史、Release Notes 和下载链接 --- +各版本 taosTools 安装包下载链接如下: + import Release from "/components/ReleaseV3"; ## 2.2.7 From f56148a450691cef2cbb0d5db609f67c6c27affd Mon Sep 17 00:00:00 2001 From: Pan YANG Date: Fri, 11 Nov 2022 17:35:08 +0800 Subject: [PATCH 12/12] docs: add download keyword to title --- docs/zh/28-releases/01-tdengine.md | 3 +-- docs/zh/28-releases/02-tools.md | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 83ca096d32..f72735d903 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -1,6 +1,6 @@ --- sidebar_label: TDengine 发布历史 -title: TDengine 发布历史 +title: TDengine 发布历史及下载链接 description: TDengine 发布历史、Release Notes 及下载链接 --- @@ -35,4 +35,3 @@ import Release from "/components/ReleaseV3"; ## 3.0.1.0 - diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index 2152dede93..ac4a884f8b 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -1,6 +1,6 @@ --- sidebar_label: taosTools 发布历史 -title: taosTools 发布历史 +title: taosTools 发布历史及下载链接 description: taosTools 的发布历史、Release Notes 和下载链接 ---