diff --git a/cmake/cmake.version b/cmake/cmake.version
index a30618157b..d0d455c73d 100644
--- a/cmake/cmake.version
+++ b/cmake/cmake.version
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
- SET(TD_VER_NUMBER "3.0.2.5")
+ SET(TD_VER_NUMBER "3.0.2.6")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index ae3b626f88..db2ae92f6e 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG 61cbfd2
+ GIT_TAG 1e15545
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md
index 0c5ccf3aef..6a62108062 100644
--- a/docs/en/28-releases/01-tdengine.md
+++ b/docs/en/28-releases/01-tdengine.md
@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3";
+## 3.0.2.6
+
+
+
## 3.0.2.5
diff --git a/docs/en/28-releases/02-tools.md b/docs/en/28-releases/02-tools.md
index 29dd605c8c..91eb0c9b8d 100644
--- a/docs/en/28-releases/02-tools.md
+++ b/docs/en/28-releases/02-tools.md
@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat
import Release from "/components/ReleaseV3";
+## 2.4.6
+
+
+
## 2.4.3
diff --git a/docs/zh/12-taos-sql/24-show.md b/docs/zh/12-taos-sql/24-show.md
index c8f4afc06b..901548e132 100644
--- a/docs/zh/12-taos-sql/24-show.md
+++ b/docs/zh/12-taos-sql/24-show.md
@@ -191,9 +191,9 @@ SHOW TABLE DISTRIBUTED table_name;
_block_dist: Total_Blocks=[5] Total_Size=[93.65 Kb] Average_size=[18.73 Kb] Compression_Ratio=[23.98 %]
-Total_Blocks : 表d0 占用的 block 个数为 5 个
+Total_Blocks: 表 d0 占用的 block 个数为 5 个
-Total_Size. : 表 d0 所有 block 在文件中占用的大小为 93.65 KB
+Total_Size: 表 d0 所有 block 在文件中占用的大小为 93.65 KB
Average_size: 平均每个 block 在文件中占用的空间大小为 18.73 KB
@@ -204,15 +204,15 @@ Compression_Ratio: 数据压缩率 23.98%
_block_dist: Total_Rows=[20000] Inmem_Rows=[0] MinRows=[3616] MaxRows=[4096] Average_Rows=[4000]
-Total_Rows: 统计表 d0 的所有行数 为20000 行
+Total_Rows: 统计表 d0 的所有行数 为20000 行(该数值仅供参考,不是精确的行数。获得精确的行数需要使用 count 函数)
-Inmem_Rows: 表示仍然还存放在内存中的行数,即没有落盘的行数,为 0行,表示没有
+Inmem_Rows: 存储在写缓存中的数据行数(没有落盘),0 行表示内存缓存中没有数据
-MinRows: BLOCK 中最小的行数,为 3616 行
+MinRows: BLOCK 中最小的行数,为 3616 行
-MaxRows: BLOCK 中最大的行数,为 4096行
+MaxRows: BLOCK 中最大的行数,为 4096 行
-Average_Rows: 每个 BLOCK 中的平均行数,为4000 行
+Average_Rows: 每个 BLOCK 中的平均行数,此时为 4000 行
*************************** 3.row ***************************
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index cf7bf83164..c9505d95a5 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
+## 3.0.2.6
+
+
+
## 3.0.2.5
diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md
index 5f277b4873..69d35f95c8 100644
--- a/docs/zh/28-releases/02-tools.md
+++ b/docs/zh/28-releases/02-tools.md
@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3";
+## 2.4.6
+
+
+
## 2.4.3
diff --git a/examples/c/tmq.c b/examples/c/tmq.c
index 8a2112fbcc..7bb8aa84a4 100644
--- a/examples/c/tmq.c
+++ b/examples/c/tmq.c
@@ -189,46 +189,54 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
tmq_t* build_consumer() {
tmq_conf_res_t code;
- tmq_t* tmq = NULL;
+ tmq_t* tmq = NULL;
- tmq_conf_t* conf = tmq_conf_new();
+ tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
code = tmq_conf_set(conf, "experimental.snapshot.enable", "false");
if (TMQ_CONF_OK != code) {
- goto _end;
+ tmq_conf_destroy(conf);
+ return NULL;
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq = tmq_consumer_new(conf, NULL, 0);
- _end:
+_end:
tmq_conf_destroy(conf);
return tmq;
}
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index c66d21b516..88eb3bdb97 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -1067,6 +1067,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
offset += pInfo->pColData->info.bytes;
}
+ taosMemoryFree(buf);
return phelper;
}
@@ -2404,7 +2405,11 @@ _end:
taosArrayDestroy(pVals);
if (terrno != 0) {
*ppReq = NULL;
- if (pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
+ if (pReq) {
+ tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
+ taosMemoryFreeClear(pReq);
+ }
+
return TSDB_CODE_FAILED;
}
*ppReq = pReq;
diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c
index 4cac0cbe14..2bb8708372 100644
--- a/source/common/src/tdataformat.c
+++ b/source/common/src/tdataformat.c
@@ -1989,8 +1989,7 @@ static FORCE_INLINE int32_t tColDataUpdateValue20(SColData *pColData, uint8_t *p
return 0;
}
static FORCE_INLINE int32_t tColDataUpdateValue30(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
- uint8_t bv = GET_BIT1(pColData->pBitMap, pColData->nVal - 1);
- if (bv == 0) { // NONE == > VALUE
+ if (GET_BIT1(pColData->pBitMap, pColData->nVal - 1) == 0) { // NONE ==> VALUE
pColData->numOfNone--;
pColData->nVal--;
if (pColData->numOfNone) {
@@ -1999,7 +1998,7 @@ static FORCE_INLINE int32_t tColDataUpdateValue30(SColData *pColData, uint8_t *p
pColData->flag = HAS_NULL;
return tColDataAppendValue20(pColData, pData, nData);
}
- } else if (forward) { // NULL == > VALUE
+ } else if (forward) { // NULL ==> VALUE
pColData->numOfNull--;
pColData->nVal--;
if (pColData->numOfNull) {
@@ -2024,36 +2023,43 @@ static FORCE_INLINE int32_t tColDataUpdateValue32(SColData *pColData, uint8_t *p
return 0;
}
static FORCE_INLINE int32_t tColDataUpdateValue40(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
- if (forward) {
- pColData->numOfValue--;
+ if (forward) { // VALUE ==> VALUE
pColData->nVal--;
- if (pColData->numOfValue) {
- return tColDataAppendValue40(pColData, pData, nData);
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
} else {
- pColData->flag = 0;
- return tColDataAppendValue00(pColData, pData, nData);
+ pColData->nData -= TYPE_BYTES[pColData->type];
}
+ return tColDataPutValue(pColData, pData, nData);
}
return 0;
}
static FORCE_INLINE int32_t tColDataUpdateValue42(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
- if (forward) {
+ if (forward) { // VALUE ==> NULL
pColData->numOfValue--;
pColData->nVal--;
if (pColData->numOfValue) {
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
+ } else {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
return tColDataAppendValue42(pColData, pData, nData);
} else {
pColData->flag = 0;
+ pColData->nData = 0;
return tColDataAppendValue02(pColData, pData, nData);
}
}
return 0;
}
static FORCE_INLINE int32_t tColDataUpdateValue50(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
- uint8_t bv = GET_BIT1(pColData->pBitMap, pColData->nVal - 1);
- if (bv == 0) { // NONE ==> VALUE
+ if (GET_BIT1(pColData->pBitMap, pColData->nVal - 1) == 0) { // NONE ==> VALUE
pColData->numOfNone--;
pColData->nVal--;
+ if (!IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
if (pColData->numOfNone) {
return tColDataAppendValue50(pColData, pData, nData);
} else {
@@ -2061,35 +2067,42 @@ static FORCE_INLINE int32_t tColDataUpdateValue50(SColData *pColData, uint8_t *p
return tColDataAppendValue40(pColData, pData, nData);
}
} else if (forward) { // VALUE ==> VALUE
- pColData->numOfValue--;
pColData->nVal--;
- if (pColData->numOfValue) {
- return tColDataAppendValue50(pColData, pData, nData);
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
} else {
- pColData->flag = HAS_NONE;
- return tColDataAppendValue10(pColData, pData, nData);
+ pColData->nData -= TYPE_BYTES[pColData->type];
}
+ return tColDataPutValue(pColData, pData, nData);
}
return 0;
}
static FORCE_INLINE int32_t tColDataUpdateValue52(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
- uint8_t bv = GET_BIT1(pColData->pBitMap, pColData->nVal - 1);
- if (bv == 0) { // NONE ==> NULL
+ if (GET_BIT1(pColData->pBitMap, pColData->nVal - 1) == 0) { // NONE ==> NULL
pColData->numOfNone--;
pColData->nVal--;
+ if (!IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
if (pColData->numOfNone) {
return tColDataAppendValue52(pColData, pData, nData);
} else {
- pColData->flag &= ~HAS_NONE;
+ pColData->flag = HAS_VALUE;
return tColDataAppendValue42(pColData, pData, nData);
}
} else if (forward) { // VALUE ==> NULL
pColData->numOfValue--;
pColData->nVal--;
if (pColData->numOfValue) {
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
+ } else {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
return tColDataAppendValue52(pColData, pData, nData);
} else {
- pColData->flag &= ~HAS_VALUE;
+ pColData->flag = HAS_NONE;
+ pColData->nData = 0;
return tColDataAppendValue12(pColData, pData, nData);
}
}
@@ -2100,6 +2113,9 @@ static FORCE_INLINE int32_t tColDataUpdateValue60(SColData *pColData, uint8_t *p
if (GET_BIT1(pColData->pBitMap, pColData->nVal - 1) == 0) { // NULL ==> VALUE
pColData->numOfNull--;
pColData->nVal--;
+ if (!IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
if (pColData->numOfNull) {
return tColDataAppendValue60(pColData, pData, nData);
} else {
@@ -2107,14 +2123,13 @@ static FORCE_INLINE int32_t tColDataUpdateValue60(SColData *pColData, uint8_t *p
return tColDataAppendValue40(pColData, pData, nData);
}
} else { // VALUE ==> VALUE
- pColData->numOfValue--;
pColData->nVal--;
- if (pColData->numOfValue) {
- return tColDataAppendValue60(pColData, pData, nData);
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
} else {
- pColData->flag = HAS_NULL;
- return tColDataAppendValue20(pColData, pData, nData);
+ pColData->nData -= TYPE_BYTES[pColData->type];
}
+ return tColDataPutValue(pColData, pData, nData);
}
}
return 0;
@@ -2124,9 +2139,15 @@ static FORCE_INLINE int32_t tColDataUpdateValue62(SColData *pColData, uint8_t *p
pColData->numOfValue--;
pColData->nVal--;
if (pColData->numOfValue) {
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
+ } else {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
return tColDataAppendValue62(pColData, pData, nData);
} else {
pColData->flag = HAS_NULL;
+ pColData->nData = 0;
return tColDataAppendValue20(pColData, pData, nData);
}
}
@@ -2139,38 +2160,44 @@ static FORCE_INLINE int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *p
if (bv == 0) { // NONE ==> VALUE
pColData->numOfNone--;
pColData->nVal--;
+ if (!IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
if (pColData->numOfNone) {
return tColDataAppendValue70(pColData, pData, nData);
} else {
for (int32_t iVal = 0; iVal < pColData->nVal; ++iVal) {
SET_BIT1(pColData->pBitMap, iVal, GET_BIT2(pColData->pBitMap, iVal) - 1);
}
- pColData->flag &= ~HAS_NONE;
+ pColData->flag = (HAS_VALUE | HAS_NULL);
return tColDataAppendValue60(pColData, pData, nData);
}
} else if (bv == 1) { // NULL ==> VALUE
if (forward) {
pColData->numOfNull--;
pColData->nVal--;
+ if (!IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
if (pColData->numOfNull) {
return tColDataAppendValue70(pColData, pData, nData);
} else {
for (int32_t iVal = 0; iVal < pColData->nVal; ++iVal) {
SET_BIT1(pColData->pBitMap, iVal, GET_BIT2(pColData->pBitMap, iVal) ? 1 : 0);
}
- pColData->flag &= ~HAS_NULL;
+ pColData->flag = (HAS_VALUE | HAS_NONE);
return tColDataAppendValue50(pColData, pData, nData);
}
}
} else if (bv == 2) { // VALUE ==> VALUE
if (forward) {
- pColData->numOfValue--;
pColData->nVal--;
- if (pColData->numOfValue) {
- return tColDataAppendValue70(pColData, pData, nData);
+ if (IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
} else {
- return tColDataPutValue(pColData, pData, nData);
+ pColData->nData -= TYPE_BYTES[pColData->type];
}
+ return tColDataPutValue(pColData, pData, nData);
}
} else {
ASSERT(0);
@@ -2179,29 +2206,37 @@ static FORCE_INLINE int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *p
}
static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
uint8_t bv = GET_BIT2(pColData->pBitMap, pColData->nVal - 1);
- ASSERT(bv < 3);
if (bv == 0) { // NONE ==> NULL
pColData->numOfNone--;
pColData->nVal--;
+ if (!IS_VAR_DATA_TYPE(pColData->type)) {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
if (pColData->numOfNone) {
return tColDataAppendValue72(pColData, pData, nData);
} else {
for (int32_t iVal = 0; iVal < pColData->nVal; ++iVal) {
SET_BIT1(pColData->pBitMap, iVal, GET_BIT2(pColData->pBitMap, iVal) - 1);
}
- pColData->flag &= ~HAS_NONE;
+ pColData->flag = (HAS_VALUE | HAS_NULL);
return tColDataAppendValue62(pColData, pData, nData);
}
} else if (bv == 2 && forward) { // VALUE ==> NULL
pColData->numOfValue--;
pColData->nVal--;
if (pColData->numOfValue) {
+ if (IS_STR_DATA_TYPE(pColData->type)) {
+ pColData->nData = pColData->aOffset[pColData->nVal];
+ } else {
+ pColData->nData -= TYPE_BYTES[pColData->type];
+ }
return tColDataAppendValue72(pColData, pData, nData);
} else {
for (int32_t iVal = 0; iVal < pColData->nVal; ++iVal) {
SET_BIT1(pColData->pBitMap, iVal, GET_BIT2(pColData->pBitMap, iVal));
}
- pColData->flag &= ~HAS_VALUE;
+ pColData->flag = (HAS_NULL | HAS_NONE);
+ pColData->nData = 0;
return tColDataAppendValue32(pColData, pData, nData);
}
}
diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c
index 8abce50cf0..23a047d49a 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c
@@ -93,18 +93,30 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
break;
}
- if (pDnode->status != DND_STAT_RUNNING) {
- if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
- dmProcessServerStartupStatus(pDnode, pRpc);
- return;
- } else {
- if (pDnode->status == DND_STAT_INIT) {
- terrno = TSDB_CODE_APP_IS_STARTING;
+/*
+pDnode is null, TD-22618
+at trans.c line 91
+before this line, dmProcessRpcMsg callback is set
+after this line, parent is set
+so when dmProcessRpcMsg is called, pDonde is still null.
+*/
+ if (pDnode != NULL){
+ if(pDnode->status != DND_STAT_RUNNING) {
+ if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
+ dmProcessServerStartupStatus(pDnode, pRpc);
+ return;
} else {
- terrno = TSDB_CODE_APP_IS_STOPPING;
+ if (pDnode->status == DND_STAT_INIT) {
+ terrno = TSDB_CODE_APP_IS_STARTING;
+ } else {
+ terrno = TSDB_CODE_APP_IS_STOPPING;
+ }
+ goto _OVER;
}
- goto _OVER;
- }
+ }
+ } else {
+ terrno = TSDB_CODE_APP_IS_STARTING;
+ goto _OVER;
}
if (pRpc->pCont == NULL && (IsReq(pRpc) || pRpc->contLen != 0)) {
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index 3771cf28c2..2d55be018b 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -916,6 +916,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
if (!existing) {
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
+ } else {
+ taosMemoryFree(addedTopic);
}
// set status
diff --git a/source/dnode/mnode/impl/src/mndDump.c b/source/dnode/mnode/impl/src/mndDump.c
index 7d0f5742f8..44a7d49fff 100644
--- a/source/dnode/mnode/impl/src/mndDump.c
+++ b/source/dnode/mnode/impl/src/mndDump.c
@@ -629,7 +629,7 @@ void mndDumpSdb() {
}
taosWriteFile(pFile, pCont, contLen);
taosWriteFile(pFile, "\n", 1);
- taosFsyncFile(pFile);
+ UNUSED(taosFsyncFile(pFile));
taosCloseFile(&pFile);
tjsonDelete(json);
taosMemoryFree(pCont);
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 089a93bea3..2a05511134 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -354,7 +354,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
- if (i < pos->slotId) {
+ if (nullIndex >= numOfNULL || i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
@@ -723,8 +723,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
- if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
-
+ if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
+ mndTransDrop(pTrans);
+ goto _OVER;
+ }
// create stb for stream
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE &&
mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c
index 91518f7a0b..4b1163bb11 100644
--- a/source/dnode/vnode/src/meta/metaQuery.c
+++ b/source/dnode/vnode/src/meta/metaQuery.c
@@ -706,7 +706,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
}
}
- if (ASSERTS(sver > 0, __FILE__, __LINE__, "failed to get table schema version: %d", sver)) {
+ if (ASSERTS(sver > 0, "failed to get table schema version: %d", sver)) {
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 7a8d899a19..8c478c52a7 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -497,7 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayPush(tagArray, &tagVal);
}
}
- pCreateTbReq->ctb.tagNum = size;
+ pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
@@ -510,15 +510,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
- SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
- if (colDataIsNull_s(pTbColInfo, rowId)) {
+ if (!pDataBlock->info.parTbName[0]) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else {
- void* pTbData = colDataGetData(pTbColInfo, rowId);
- pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
- memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
+ pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
}
taosArrayPush(reqs.pArray, pCreateTbReq);
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index c05206785b..79e0ccc3e3 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -813,7 +813,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (!state->pBlockData) {
state->pBlockData = &state->blockData;
- tBlockDataCreate(&state->blockData);
+ code = tBlockDataCreate(&state->blockData);
+ if (code) goto _err;
}
}
case SFSNEXTROW_BLOCKDATA:
diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
index cf1abd40e1..b2f72e9f2b 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
@@ -433,8 +433,10 @@ _end:
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
taosThreadMutexUnlock(&pr->readerMutex);
- for (int32_t j = 0; j < pr->numOfCols; ++j) {
- taosMemoryFree(pRes[j]);
+ if (pRes != NULL) {
+ for (int32_t j = 0; j < pr->numOfCols; ++j) {
+ taosMemoryFree(pRes[j]);
+ }
}
taosMemoryFree(pRes);
diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c
index 2f49781276..3299a2f497 100644
--- a/source/dnode/vnode/src/tsdb/tsdbDataIter.c
+++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c
@@ -219,7 +219,7 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
}
ASSERT(pIter->rowInfo.suid == pIter->dIter.bData.suid);
- ASSERT(pIter->rowInfo.uid = pIter->dIter.bData.uid);
+ ASSERT(pIter->rowInfo.uid == pIter->dIter.bData.uid);
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
pIter->dIter.iRow++;
goto _exit;
diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c
index 9b3dbcd8ea..d91475376b 100644
--- a/source/dnode/vnode/src/tsdb/tsdbFile.c
+++ b/source/dnode/vnode/src/tsdb/tsdbFile.c
@@ -151,7 +151,7 @@ int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
int64_t size = 0;
int64_t n;
TdFilePtr pFD;
- char fname[TSDB_FILENAME_LEN];
+ char fname[TSDB_FILENAME_LEN] = {0};
char hdr[TSDB_FHDR_SIZE] = {0};
// truncate
diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c
index 3db9ff2b42..554ec0f1f9 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c
@@ -473,7 +473,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
int8_t forward) {
int32_t code = 0;
int8_t level;
- SMemSkipListNode *pNode;
+ SMemSkipListNode *pNode = NULL;
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
int64_t nSize;
@@ -591,7 +591,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData);
if (pBlockData->aColData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
+ goto _exit;
}
+
for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) {
code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool);
if (code) goto _exit;
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 2345c61607..7bb4a3def9 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -874,7 +874,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
pBlockNum->numOfBlocks += 1;
}
- if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
+ if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
numOfQTable += 1;
}
}
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index cce182478d..c5405664c6 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -989,32 +989,42 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId;
+ char* tbName = pSrcBlock->info.parTbName;
if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
- void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1);
- char* tbName = pSrcBlock->info.parTbName;
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
- int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
- memcpy(tbName, varDataVal(pData), len);
+ int32_t len = 0;
+ if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
+ len = TMIN(sizeof(TSDB_DATA_NULL_STR), TSDB_TABLE_NAME_LEN - 1);
+ memcpy(tbName, TSDB_DATA_NULL_STR, len);
+ } else {
+ void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
+ len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
+ memcpy(tbName, varDataVal(pData), len);
+ }
streamStatePutParName(pState, groupId, tbName);
memcpy(pTmpBlock->info.parTbName, tbName, len);
pDestBlock->info.rows--;
} else {
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
colDataSetNULL(pTbNameCol, pDestBlock->info.rows);
- pSrcBlock->info.parTbName[0] = 0;
+ tbName[0] = 0;
}
if (pTagSup->numOfExprs > 0) {
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
pDestBlock->info.rows--;
+ } else {
+ memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
}
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock);
+ } else {
+ memcpy(pSrcBlock->info.parTbName, pValue, TSDB_TABLE_NAME_LEN);
}
streamStateReleaseBuf(pState, NULL, pValue);
}
diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c
index 34ce67beb0..3ae114c656 100644
--- a/source/libs/executor/src/projectoperator.c
+++ b/source/libs/executor/src/projectoperator.c
@@ -275,7 +275,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// for stream interval
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
- // printDataBlock1(pBlock, "project1");
return pBlock;
}
diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c
index dc3ab79afb..d84bdddd37 100644
--- a/source/libs/executor/src/sortoperator.c
+++ b/source/libs/executor/src/sortoperator.c
@@ -46,8 +46,9 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->pTaskInfo = pTaskInfo;
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
- int32_t numOfCols = 0;
- SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
+ int32_t numOfCols = 0;
+ pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
+ pOperator->exprSupp.numOfExprs = numOfCols;
int32_t numOfOutputCols = 0;
int32_t code =
@@ -56,7 +57,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
goto _error;
}
- pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
+ pOperator->exprSupp.pCtx =
+ createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
initResultSizeInfo(&pOperator->resultInfo, 1024);
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
@@ -68,8 +70,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
- pOperator->exprSupp.pExprInfo = pExprInfo;
- pOperator->exprSupp.numOfExprs = numOfCols;
+
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c
index 2a7edea290..f090d5ac67 100644
--- a/source/libs/executor/src/sysscanoperator.c
+++ b/source/libs/executor/src/sysscanoperator.c
@@ -1607,8 +1607,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
if (pInfo->tbnameSlotId != -1) {
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
- memcpy(varDataVal(varTbName), name, strlen(name));
- varDataSetLen(varTbName, strlen(name));
+ STR_TO_VARSTR(varTbName, name);
colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
}
diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c
index 14de05cf6e..b74c446575 100644
--- a/source/libs/executor/src/tsort.c
+++ b/source/libs/executor/src/tsort.c
@@ -212,6 +212,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
+ taosArrayDestroy(pPageIdList);
blockDataDestroy(p);
taosArrayDestroy(pPageIdList);
return terrno;
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index f838466afd..a04e67d8ab 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -1686,53 +1686,50 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
double v = 0;
tMemBucket* pMemBucket = ppInfo->pMemBucket;
- if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null
- code = TSDB_CODE_FAILED;
- goto _fin_error;
- }
+ if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
+ if (pCtx->numOfParams > 2) {
+ char buf[512] = {0};
+ size_t len = 1;
- if (pCtx->numOfParams > 2) {
- char buf[512] = {0};
- size_t len = 1;
+ varDataVal(buf)[0] = '[';
+ for (int32_t i = 1; i < pCtx->numOfParams; ++i) {
+ SVariant* pVal = &pCtx->param[i].param;
- varDataVal(buf)[0] = '[';
- for (int32_t i = 1; i < pCtx->numOfParams; ++i) {
- SVariant* pVal = &pCtx->param[i].param;
+ GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
+
+ int32_t code = getPercentile(pMemBucket, v, &ppInfo->result);
+ if (code != TSDB_CODE_SUCCESS) {
+ goto _fin_error;
+ }
+
+ if (i == pCtx->numOfParams - 1) {
+ len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf]", ppInfo->result);
+ } else {
+ len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf, ", ppInfo->result);
+ }
+ }
+
+ int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
+ SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
+
+ varDataSetLen(buf, len);
+ colDataAppend(pCol, pBlock->info.rows, buf, false);
+
+ tMemBucketDestroy(pMemBucket);
+ return pResInfo->numOfRes;
+ } else {
+ SVariant* pVal = &pCtx->param[1].param;
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
- int32_t code = getPercentile(pMemBucket, v, &ppInfo->result);
+ code = getPercentile(pMemBucket, v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) {
goto _fin_error;
}
- if (i == pCtx->numOfParams - 1) {
- len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf]", ppInfo->result);
- } else {
- len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf, ", ppInfo->result);
- }
+ tMemBucketDestroy(pMemBucket);
+ return functionFinalize(pCtx, pBlock);
}
-
- int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
- SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
-
- varDataSetLen(buf, len);
- colDataAppend(pCol, pBlock->info.rows, buf, false);
-
- tMemBucketDestroy(pMemBucket);
- return pResInfo->numOfRes;
- } else {
- SVariant* pVal = &pCtx->param[1].param;
-
- GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
-
- code = getPercentile(pMemBucket, v, &ppInfo->result);
- if (code != TSDB_CODE_SUCCESS) {
- goto _fin_error;
- }
-
- tMemBucketDestroy(pMemBucket);
- return functionFinalize(pCtx, pBlock);
}
_fin_error:
diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h
index a823cfda0b..a1de2ee71a 100644
--- a/source/libs/sync/inc/syncPipeline.h
+++ b/source/libs/sync/inc/syncPipeline.h
@@ -68,7 +68,7 @@ void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
// access
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
- return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
+ return ((int64_t)1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
}
static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h
index 58944eb08c..a39e043c52 100644
--- a/source/libs/sync/inc/syncRaftEntry.h
+++ b/source/libs/sync/inc/syncRaftEntry.h
@@ -49,39 +49,6 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
}
-typedef struct SRaftEntryHashCache {
- SHashObj* pEntryHash;
- int32_t maxCount;
- int32_t currentCount;
- TdThreadMutex mutex;
- SSyncNode* pSyncNode;
-} SRaftEntryHashCache;
-
-SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
-void raftCacheDestroy(SRaftEntryHashCache* pCache);
-int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry);
-int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
-int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
-int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index);
-int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
-int32_t raftCacheClear(struct SRaftEntryHashCache* pCache);
-
-typedef struct SRaftEntryCache {
- SSkipList* pSkipList;
- int32_t maxCount;
- int32_t currentCount;
- int32_t refMgr;
- TdThreadMutex mutex;
- SSyncNode* pSyncNode;
-} SRaftEntryCache;
-
-SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
-void raftEntryCacheDestroy(SRaftEntryCache* pCache);
-int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry);
-int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
-int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
-int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count);
-
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c
index b04bcb86c6..bd1dae54d9 100644
--- a/source/libs/sync/src/syncAppendEntries.c
+++ b/source/libs/sync/src/syncAppendEntries.c
@@ -104,6 +104,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0};
bool accepted = false;
+ SSyncRaftEntry* pEntry = NULL;
+
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
@@ -137,14 +139,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->term);
syncNodeResetElectTimer(ths);
- if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) {
+ if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
goto _IGNORE;
}
- SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
-
+ pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
if (pEntry == NULL) {
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
goto _IGNORE;
@@ -191,5 +192,6 @@ _out:
_IGNORE:
rpcFreeCont(rpcRsp.pCont);
+ syncEntryDestroy(pEntry);
return 0;
}
diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c
index f81699b9f6..a60f43cd5e 100644
--- a/source/libs/sync/src/syncAppendEntriesReply.c
+++ b/source/libs/sync/src/syncAppendEntriesReply.c
@@ -40,7 +40,7 @@
//
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
- SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
+ SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
int32_t ret = 0;
// if already drop replica, do not process
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index a9571f9e3b..a439471143 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -1126,29 +1126,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodePreClose(SSyncNode* pSyncNode) {
- if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
- while (1) {
- int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
- sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
- if (aqItems == 0 || aqItems == -1) {
- break;
- }
- taosMsleep(20);
- }
- }
+ ASSERT(pSyncNode != NULL);
+ ASSERT(pSyncNode->pFsm != NULL);
+ ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
-#if 0
- if (pSyncNode->pNewNodeReceiver != NULL) {
- if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
- snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
+ while (1) {
+ int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
+ sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
+ if (aqItems == 0 || aqItems == -1) {
+ break;
}
-
- sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
- pSyncNode->pNewNodeReceiver);
- snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
- pSyncNode->pNewNodeReceiver = NULL;
+ taosMsleep(20);
}
-#endif
// stop elect timer
syncNodeStopElectTimer(pSyncNode);
@@ -1461,7 +1450,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
// log begin config change
- sNInfo(pSyncNode, "begin do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
+ sNInfo(pSyncNode, "begin do config change, from %d to %d, replicas:%d", pSyncNode->vgId, oldConfig.replicaNum,
pNewConfig->replicaNum);
if (IamInNew) {
@@ -1742,8 +1731,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
#endif
// close receiver
- if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
- snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
+ if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
}
diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c
index e2b039a2e4..faa44a626c 100644
--- a/source/libs/sync/src/syncPipeline.c
+++ b/source/libs/sync/src/syncPipeline.c
@@ -910,7 +910,7 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
int64_t timeDiffMs = lastSentMs - firstSentMs;
- if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
+ if (timeDiffMs > 0 && timeDiffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
pMgr->retryBackoff -= 1;
}
}
@@ -937,10 +937,6 @@ SSyncLogReplMgr* syncLogReplMgrCreate() {
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
return pMgr;
-
-_err:
- taosMemoryFree(pMgr);
- return NULL;
}
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c
index 806949c81e..f780e255ce 100644
--- a/source/libs/sync/src/syncRaftCfg.c
+++ b/source/libs/sync/src/syncRaftCfg.c
@@ -224,7 +224,7 @@ _OVER:
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
SRaftCfg *pCfg = &pNode->raftCfg;
- if (pCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT) {
+ if (pCfg->configIndexCount < MAX_CONFIG_INDEX_COUNT) {
return -1;
}
diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c
index 623f1b77a4..3e63e2fb8e 100644
--- a/source/libs/sync/src/syncRaftEntry.c
+++ b/source/libs/sync/src/syncRaftEntry.c
@@ -102,344 +102,3 @@ void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}
-
-SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
- SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
- if (pCache == NULL) {
- sError("vgId:%d, raft cache create error", pSyncNode->vgId);
- return NULL;
- }
-
- pCache->pEntryHash =
- taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
- if (pCache->pEntryHash == NULL) {
- sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
- return NULL;
- }
-
- taosThreadMutexInit(&(pCache->mutex), NULL);
- pCache->maxCount = maxCount;
- pCache->currentCount = 0;
- pCache->pSyncNode = pSyncNode;
-
- return pCache;
-}
-
-void raftCacheDestroy(SRaftEntryHashCache* pCache) {
- if (pCache != NULL) {
- taosThreadMutexLock(&pCache->mutex);
- taosHashCleanup(pCache->pEntryHash);
- taosThreadMutexUnlock(&pCache->mutex);
- taosThreadMutexDestroy(&(pCache->mutex));
- taosMemoryFree(pCache);
- }
-}
-
-// success, return 1
-// max count, return 0
-// error, return -1
-int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
- taosThreadMutexLock(&pCache->mutex);
-
- if (pCache->currentCount >= pCache->maxCount) {
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
- }
-
- taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
- ++(pCache->currentCount);
-
- sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
- TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
- pEntry->index, pEntry->bytes);
- taosThreadMutexUnlock(&pCache->mutex);
- return 1;
-}
-
-// success, return 0
-// error, return -1
-// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
-int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
- if (ppEntry == NULL) {
- return -1;
- }
- *ppEntry = NULL;
-
- taosThreadMutexLock(&pCache->mutex);
- void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
- if (pTmp != NULL) {
- SSyncRaftEntry* pEntry = pTmp;
- *ppEntry = taosMemoryMalloc(pEntry->bytes);
- memcpy(*ppEntry, pTmp, pEntry->bytes);
-
- sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
- TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
- (*ppEntry)->originalRpcType, (*ppEntry)->index);
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
- }
-
- taosThreadMutexUnlock(&pCache->mutex);
- terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
- return -1;
-}
-
-// success, return 0
-// error, return -1
-// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
-int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
- if (ppEntry == NULL) {
- return -1;
- }
- *ppEntry = NULL;
-
- taosThreadMutexLock(&pCache->mutex);
- void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
- if (pTmp != NULL) {
- SSyncRaftEntry* pEntry = pTmp;
- *ppEntry = pEntry;
-
- sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
- TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
- (*ppEntry)->originalRpcType, (*ppEntry)->index);
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
- }
-
- taosThreadMutexUnlock(&pCache->mutex);
- terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
- return -1;
-}
-
-int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
- taosThreadMutexLock(&pCache->mutex);
- taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
- --(pCache->currentCount);
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
-}
-
-int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
- if (ppEntry == NULL) {
- return -1;
- }
- *ppEntry = NULL;
-
- taosThreadMutexLock(&pCache->mutex);
- void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
- if (pTmp != NULL) {
- SSyncRaftEntry* pEntry = pTmp;
- *ppEntry = taosMemoryMalloc(pEntry->bytes);
- memcpy(*ppEntry, pTmp, pEntry->bytes);
-
- sNTrace(pCache->pSyncNode, "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%" PRId64,
- TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
- (*ppEntry)->originalRpcType, (*ppEntry)->index);
-
- taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
- --(pCache->currentCount);
-
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
- }
-
- taosThreadMutexUnlock(&pCache->mutex);
- terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
- return -1;
-}
-
-int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
- taosThreadMutexLock(&pCache->mutex);
- taosHashClear(pCache->pEntryHash);
- pCache->currentCount = 0;
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
-}
-
-static char* keyFn(const void* pData) {
- SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
- return (char*)(&(pEntry->index));
-}
-
-static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
-
-static void freeRaftEntry(void* param) {
- SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
- syncEntryDestroy(pEntry);
-}
-
-SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
- SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
- if (pCache == NULL) {
- sError("vgId:%d, raft cache create error", pSyncNode->vgId);
- return NULL;
- }
-
- pCache->pSkipList =
- tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
- if (pCache->pSkipList == NULL) {
- sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
- return NULL;
- }
-
- taosThreadMutexInit(&(pCache->mutex), NULL);
- pCache->refMgr = taosOpenRef(10, freeRaftEntry);
- pCache->maxCount = maxCount;
- pCache->currentCount = 0;
- pCache->pSyncNode = pSyncNode;
-
- return pCache;
-}
-
-void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
- if (pCache != NULL) {
- taosThreadMutexLock(&pCache->mutex);
- tSkipListDestroy(pCache->pSkipList);
- if (pCache->refMgr != -1) {
- taosCloseRef(pCache->refMgr);
- pCache->refMgr = -1;
- }
- taosThreadMutexUnlock(&pCache->mutex);
- taosThreadMutexDestroy(&(pCache->mutex));
- taosMemoryFree(pCache);
- }
-}
-
-// success, return 1
-// max count, return 0
-// error, return -1
-int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
- taosThreadMutexLock(&pCache->mutex);
-
- if (pCache->currentCount >= pCache->maxCount) {
- taosThreadMutexUnlock(&pCache->mutex);
- return 0;
- }
-
- SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
- ASSERT(pSkipListNode != NULL);
- ++(pCache->currentCount);
-
- pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
- ASSERT(pEntry->rid >= 0);
-
- sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
- TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
- pEntry->index, pEntry->bytes);
- taosThreadMutexUnlock(&pCache->mutex);
- return 1;
-}
-
-// find one, return 1
-// not found, return 0
-// error, return -1
-int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
- ASSERT(ppEntry != NULL);
- SSyncRaftEntry* pEntry = NULL;
- int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
- if (code == 1) {
- int32_t bytes = (int32_t)pEntry->bytes;
- *ppEntry = taosMemoryMalloc((int64_t)bytes);
- memcpy(*ppEntry, pEntry, pEntry->bytes);
- (*ppEntry)->rid = -1;
- } else {
- *ppEntry = NULL;
- }
- return code;
-}
-
-// find one, return 1
-// not found, return 0
-// error, return -1
-int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
- taosThreadMutexLock(&pCache->mutex);
-
- SyncIndex index2 = index;
- int32_t code = 0;
-
- SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
- int32_t arraySize = taosArrayGetSize(entryPArray);
- if (arraySize == 1) {
- SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
- ASSERT(*ppNode != NULL);
- *ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
- taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
- code = 1;
-
- } else if (arraySize == 0) {
- code = 0;
-
- } else {
- ASSERT(0);
-
- code = -1;
- }
- taosArrayDestroy(entryPArray);
-
- taosThreadMutexUnlock(&pCache->mutex);
- return code;
-}
-
-// count = -1, clear all
-// count >= 0, clear count
-// return -1, error
-// return delete count
-int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
- taosThreadMutexLock(&pCache->mutex);
- int32_t returnCnt = 0;
-
- if (count == -1) {
- // clear all
- SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
- while (tSkipListIterNext(pIter)) {
- SSkipListNode* pNode = tSkipListIterGet(pIter);
- ASSERT(pNode != NULL);
- SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
- syncEntryDestroy(pEntry);
- ++returnCnt;
- }
- tSkipListDestroyIter(pIter);
-
- tSkipListDestroy(pCache->pSkipList);
- pCache->pSkipList =
- tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
- ASSERT(pCache->pSkipList != NULL);
-
- } else {
- // clear count
- int i = 0;
- SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
- SArray* delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));
-
- // free entry
- while (tSkipListIterNext(pIter)) {
- SSkipListNode* pNode = tSkipListIterGet(pIter);
- ASSERT(pNode != NULL);
- if (i++ >= count) {
- break;
- }
-
- // sDebug("push pNode:%p", pNode);
- taosArrayPush(delNodeArray, &pNode);
- ++returnCnt;
- SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
-
- // syncEntryDestroy(pEntry);
- taosRemoveRef(pCache->refMgr, pEntry->rid);
- }
- tSkipListDestroyIter(pIter);
-
- // delete skiplist node
- int32_t arraySize = taosArrayGetSize(delNodeArray);
- for (int32_t i = 0; i < arraySize; ++i) {
- SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
- // sDebug("get pNode:%p", *ppNode);
- tSkipListRemoveNode(pCache->pSkipList, *ppNode);
- }
- taosArrayDestroy(delNodeArray);
- }
-
- pCache->currentCount -= returnCnt;
- taosThreadMutexUnlock(&pCache->mutex);
- return returnCnt;
-}
diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c
index a83a19928e..381327d4d7 100644
--- a/source/libs/sync/src/syncSnapshot.c
+++ b/source/libs/sync/src/syncSnapshot.c
@@ -168,17 +168,19 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (pSender->blockLen > 0) {
// has read data
- sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
+ sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
+ pSender->blockLen, pSender->seq);
} else {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
- sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
+ sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
+ pSender->blockLen, pSender->seq);
}
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
- sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
+ sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
return -1;
}
@@ -340,11 +342,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
taosMemoryFree(pReceiver);
}
-bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
+bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
+ return (pReceiver != NULL ? pReceiver->start : false);
+}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
- sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
+ sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
@@ -851,8 +855,8 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
- sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
- snapshot.lastApplyIndex);
+ sSError(pSender, "prepare snapshot failed since beginIndex:%" PRId64 " larger than applyIndex:%" PRId64,
+ pMsg->snapBeginIndex, snapshot.lastApplyIndex);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
@@ -966,7 +970,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pSender->pReader == NULL || pSender->finish) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
- sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
+ sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
+ pSender->pReader, pSender->finish);
terrno = pMsg->code;
goto _ERROR;
}
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index ee56479a31..cda7e35b0f 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -913,7 +913,7 @@ int walLoadMeta(SWal* pWal) {
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
if (fileSize == 0) {
- taosRemoveFile(fnameStr);
+ (void)taosRemoveFile(fnameStr);
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
return -1;
}
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 958e7dc23d..701d8da8c0 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -63,7 +63,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
}
}
- walRemoveMeta(pWal);
+ (void)walRemoveMeta(pWal);
pWal->writeCur = -1;
pWal->totSize = 0;
diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c
index 6bcf4ad39b..d39e8599f1 100644
--- a/source/util/src/tpagedbuf.c
+++ b/source/util/src/tpagedbuf.c
@@ -391,7 +391,7 @@ _error:
return TSDB_CODE_OUT_OF_MEMORY;
}
-static char* doExtractPage(SDiskbasedBuf* pBuf) {
+static char* doExtractPage(SDiskbasedBuf* pBuf, bool* newPage) {
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evictBufPage(pBuf);
@@ -405,6 +405,7 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
+ *newPage = true;
}
return availablePage;
@@ -413,7 +414,8 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
pBuf->statis.getPages += 1;
- char* availablePage = doExtractPage(pBuf);
+ bool newPage = false;
+ char* availablePage = doExtractPage(pBuf, &newPage);
if (availablePage == NULL) {
return NULL;
}
@@ -432,6 +434,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
// register page id info
pi = registerNewPageInfo(pBuf, *pageId);
if (pi == NULL) {
+ if (newPage) {
+ taosMemoryFree(availablePage);
+ }
return NULL;
}
@@ -492,7 +497,8 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
- (*pi)->pData = doExtractPage(pBuf);
+ bool newPage = false;
+ (*pi)->pData = doExtractPage(pBuf, &newPage);
// failed to evict buffer page, return with error code.
if ((*pi)->pData == NULL) {
@@ -509,6 +515,10 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
if (HAS_DATA_IN_DISK(*pi)) {
int32_t code = loadPageFromDisk(pBuf, *pi);
if (code != 0) {
+ if (newPage) {
+ taosMemoryFree((*pi)->pData);
+ }
+
terrno = code;
return NULL;
}
diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim
index fda78af621..288dd35cfe 100644
--- a/tests/script/tsim/stream/checkStreamSTable.sim
+++ b/tests/script/tsim/stream/checkStreamSTable.sim
@@ -301,7 +301,7 @@ print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
-loop2:
+loop3:
sleep 300
@@ -317,47 +317,182 @@ if $rows != 2 then
print $data00, $data01, $data02, $data03
print $data10, $data11, $data12, $data13
print $data20, $data21, $data22, $data23
- goto loop2
+ goto loop3
endi
if $data01 != 10 then
print =====data01=$data01
- goto loop2
+ goto loop3
endi
if $data02 != 20 then
print =====data02=$data02
- goto loop2
+ goto loop3
endi
if $data03 != 1 then
print =====data03=$data03
- goto loop2
+ goto loop3
endi
if $data04 != NULL then
print =====data04=$data04
- goto loop2
+ goto loop3
endi
if $data11 != 40 then
print =====data11=$data11
- goto loop2
+ goto loop3
endi
if $data12 != 50 then
print =====data12=$data12
- goto loop2
+ goto loop3
endi
if $data13 != 1 then
print =====data13=$data13
- goto loop2
+ goto loop3
endi
if $data14 != NULL then
print =====data14=$data14
- goto loop2
+ goto loop3
+endi
+
+print ===== step7
+
+sql create database result5 vgroups 1;
+
+sql create database test5 vgroups 4;
+sql use test5;
+
+sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
+sql create table t1 using st tags(1,2,3);
+sql create table t2 using st tags(4,5,6);
+
+sql create stable result5.streamt5(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int);
+
+sql create stream streams5 trigger at_once into result5.streamt5(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg3 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
+
+sql insert into t1 values(1648791213000,NULL,NULL,NULL);
+
+$loop_count = 0
+
+print select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
+sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s);
+print $data00, $data01, $data02, $data03
+print $data10, $data11, $data12, $data13
+print $data20, $data21, $data22, $data23
+
+loop4:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+print sql select * from result5.streamt5 order by tg1;
+sql select * from result5.streamt5 order by tg1;
+print $data00, $data01, $data02, $data03 $data04 $data05 $data06 $data07
+print $data10, $data11, $data12, $data13
+print $data20, $data21, $data22, $data23
+
+if $rows != 1 then
+ print =====rows=$rows
+ goto loop4
+endi
+
+if $data01 != NULL then
+ print =====data01=$data01
+ goto loop4
+endi
+
+if $data02 != NULL then
+ print =====data02=$data02
+ goto loop4
+endi
+
+if $data03 != 1 then
+ print =====data03=$data03
+ goto loop4
+endi
+
+if $data04 != NULL then
+ print =====data04=$data04
+ goto loop4
+endi
+
+if $data05 != 2 then
+ print =====data05=$data05
+ goto loop4
+endi
+
+if $data06 != 2 then
+ print =====data06=$data06
+ goto loop4
+endi
+
+if $data07 != NULL then
+ print =====data07=$data07
+ goto loop4
+endi
+
+sql drop stream if exists streams4;
+sql drop stream if exists streams5;
+sql drop database if exists test4;
+sql drop database if exists test5;
+sql drop database if exists result4;
+sql drop database if exists result5;
+
+print ===== step8
+
+sql drop stream if exists streams8;
+sql drop database if exists test8;
+sql create database test8 vgroups 1;
+sql use test8;
+sql create table t1(ts timestamp, a int, b int , c int, d double);
+sql create stream streams8 trigger at_once into streamt8 as select _wstart as ts, count(*) c1, count(d) c2, count(c) c3 from t1 partition by tbname interval(10s) ;
+
+sql drop stream streams8;
+sql create stream streams71 trigger at_once into streamt8(ts, c2) tags(group_id)as select _wstart, count(*) from t1 partition by tbname as group_id interval(10s);
+
+sql insert into t1 values(1648791233000,1,2,3,1.0);
+
+loop8:
+
+sleep 300
+
+$loop_count = $loop_count + 1
+if $loop_count == 10 then
+ return -1
+endi
+
+sql select * from streamt8;
+print $data00, $data01, $data02, $data03
+print $data10, $data11, $data12, $data13
+print $data20, $data21, $data22, $data23
+
+if $rows != 1 then
+ print =====rows=$rows
+ goto loop8
+endi
+
+if $data01 != NULL then
+ print =====data01=$data01
+ goto loop8
+endi
+
+if $data02 != 1 then
+ print =====data02=$data02
+ goto loop8
+endi
+
+if $data03 != NULL then
+ print =====data03=$data03
+ goto loop8
endi
print ======over
diff --git a/tests/script/tsim/stream/udTableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim
index bfc299df0f..8bf34dc54c 100644
--- a/tests/script/tsim/stream/udTableAndTag0.sim
+++ b/tests/script/tsim/stream/udTableAndTag0.sim
@@ -39,7 +39,10 @@ sql select table_name from information_schema.ins_tables where db_name="result"
if $rows != 2 then
print =====rows=$rows
- print $data00 $data10
+ print $data00
+ print $data10
+ print $data20
+ print $data30
goto loop0
endi
diff --git a/utils/tsim/src/simExe.c b/utils/tsim/src/simExe.c
index 258c611557..d82c948cf7 100644
--- a/utils/tsim/src/simExe.c
+++ b/utils/tsim/src/simExe.c
@@ -31,7 +31,7 @@ void simLogSql(char *sql, bool useSharp) {
taosFprintfFile(pFile, "%s;\n", sql);
}
- taosFsyncFile(pFile);
+ UNUSED(taosFsyncFile(pFile));
}
}