diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx
index 542edcb064..2212cded7f 100644
--- a/docs/en/07-develop/07-tmq.mdx
+++ b/docs/en/07-develop/07-tmq.mdx
@@ -352,7 +352,7 @@ You configure the following parameters when creating a consumer:
| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior(version < 3.2.0.0); `latest`: subscribe from the latest data, this is the default behavior(version >= 3.2.0.0); or `none`: can't subscribe without committed offset|
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
-| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
+| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages. Not applicable if subscribe to a column (tbname can be written as a column in the subquery statement during column subscriptions) (This parameter has been deprecated since version 3.2.0.0 and remains true) | default value: false
The method of specifying these parameters depends on the language used:
diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md
index 16172277b5..0b7ca02b9f 100644
--- a/docs/zh/05-get-started/index.md
+++ b/docs/zh/05-get-started/index.md
@@ -4,7 +4,7 @@ description: '快速设置 TDengine 环境并体验其高效写入和查询'
---
import xiaot from './xiaot.webp'
-import xiaot_new from './xiaot-03.webp'
+import xiaot_new from './xiaot-20231007.png'
import channel from './channel.webp'
import official_account from './official-account.webp'
diff --git a/docs/zh/05-get-started/xiaot-20231007.png b/docs/zh/05-get-started/xiaot-20231007.png
new file mode 100644
index 0000000000..553bcbd090
Binary files /dev/null and b/docs/zh/05-get-started/xiaot-20231007.png differ
diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md
index 2572c93c52..127b5d2755 100644
--- a/docs/zh/07-develop/07-tmq.md
+++ b/docs/zh/07-develop/07-tmq.md
@@ -60,17 +60,17 @@ import CDemo from "./_sub_c.mdx";
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
typedef enum tmq_conf_res_t {
- TMQ_CONF_UNKNOWN = -2,
- TMQ_CONF_INVALID = -1,
- TMQ_CONF_OK = 0,
-} tmq_conf_res_t;
+ TMQ_CONF_UNKNOWN = -2,
+ TMQ_CONF_INVALID = -1,
+ TMQ_CONF_OK = 0,
+ } tmq_conf_res_t;
typedef struct tmq_topic_assignment {
- int32_t vgId;
- int64_t currentOffset;
- int64_t begin;
- int64_t end;
-} tmq_topic_assignment;
+ int32_t vgId;
+ int64_t currentOffset;
+ int64_t begin;
+ int64_t end;
+ } tmq_topic_assignment;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
@@ -103,7 +103,7 @@ import CDemo from "./_sub_c.mdx";
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
- DLL_EXPORT const char *tmq_err2str(int32_t code);DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT const char *tmq_err2str(int32_t code);
```
下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
@@ -351,7 +351,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
| `auto.offset.reset` | enum | 消费组订阅的初始位置 |
`earliest`: default(version < 3.2.0.0);从头开始订阅;
`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅;
`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
-| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 |
+| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) |默认关闭 |
对于不同编程语言,其设置方式如下:
diff --git a/packaging/cfg/taosd.service b/packaging/cfg/taosd.service
index 52c4b1d1e2..bfa330f6cb 100644
--- a/packaging/cfg/taosd.service
+++ b/packaging/cfg/taosd.service
@@ -8,7 +8,7 @@ Type=simple
ExecStart=/usr/bin/taosd
ExecStartPre=/usr/local/taos/bin/startPre.sh
TimeoutStopSec=1000000s
-LimitNOFILE=infinity
+LimitNOFILE=1048576
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index ffc71bf647..d7d3dd002c 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -1668,6 +1668,13 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
return;
}
+ if (strcasecmp(option, "asynclog") == 0) {
+ int32_t newAsynclog = atoi(value);
+ uInfo("asynclog set from %d to %d", tsAsyncLog, newAsynclog);
+ tsAsyncLog = newAsynclog;
+ return;
+ }
+
const char *options[] = {
"dDebugFlag", "vDebugFlag", "mDebugFlag", "wDebugFlag", "sDebugFlag", "tsdbDebugFlag", "tqDebugFlag",
"fsDebugFlag", "udfDebugFlag", "smaDebugFlag", "idxDebugFlag", "tdbDebugFlag", "tmrDebugFlag", "uDebugFlag",
diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c
index 4871d88a57..96d5a6a964 100644
--- a/source/dnode/mnode/impl/src/mndDnode.c
+++ b/source/dnode/mnode/impl/src/mndDnode.c
@@ -1174,6 +1174,21 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "ttlbatchdropnum");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
+ } else if (strncasecmp(cfgReq.config, "asynclog", 8) == 0) {
+ int32_t optLen = strlen("asynclog");
+ int32_t flag = -1;
+ int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
+ if (code < 0) return code;
+
+ if (flag < 0 || flag > 1) {
+ mError("dnode:%d, failed to config asynclog since value:%d. Valid range: [0, 1]", cfgReq.dnodeId, flag);
+ terrno = TSDB_CODE_INVALID_CFG;
+ tFreeSMCfgDnodeReq(&cfgReq);
+ return -1;
+ }
+
+ strcpy(dcfgReq.config, "asynclog");
+ snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
#ifdef TD_ENTERPRISE
} else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) {
int32_t optLen = strlen("supportvnodes");
diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c
index b8d5c35dfb..41392ba27b 100644
--- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c
+++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c
@@ -105,6 +105,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
pHdr->size = len;
memcpy(pHdr->data, rowData, len);
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
+ taosMemoryFree(rowData);
return code;
_err:
diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
index caf88f55fc..e9e848f1b0 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
@@ -415,6 +415,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
_end:
tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
+ if (pr->pCurFileSet) {
+ pr->pCurFileSet = NULL;
+ }
taosThreadMutexUnlock(&pr->readerMutex);
diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c
index c55e5f92ea..79964c5636 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c
@@ -562,6 +562,8 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
} else {
SCommitter2 committer[1];
+ tsdbFSCheckCommit(tsdb->pFS);
+
code = tsdbOpenCommitter(tsdb, info, committer);
TSDB_CHECK_CODE(code, lino, _exit);
diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c
index 5e5348e9b5..afc9b7db3a 100644
--- a/source/dnode/vnode/src/tsdb/tsdbFS2.c
+++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c
@@ -18,6 +18,8 @@
#include "vnd.h"
#include "vndCos.h"
+#define BLOCK_COMMIT_FACTOR 3
+
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
extern void remove_file(const char *fname);
@@ -65,11 +67,17 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue;
fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue;
+ taosThreadMutexInit(&fs[0]->commitMutex, NULL);
+ taosThreadCondInit(&fs[0]->canCommit, NULL);
+ fs[0]->blockCommit = false;
+
return 0;
}
static int32_t destroy_fs(STFileSystem **fs) {
if (fs[0] == NULL) return 0;
+ taosThreadMutexDestroy(&fs[0]->commitMutex);
+ taosThreadCondDestroy(&fs[0]->canCommit);
taosThreadMutexDestroy(fs[0]->mutex);
ASSERT(fs[0]->bgTaskNum == 0);
@@ -829,6 +837,27 @@ _exit:
return code;
}
+static int32_t tsdbFSSetBlockCommit(STFileSystem *fs, bool block) {
+ taosThreadMutexLock(&fs->commitMutex);
+ if (block) {
+ fs->blockCommit = true;
+ } else {
+ fs->blockCommit = false;
+ taosThreadCondSignal(&fs->canCommit);
+ }
+ taosThreadMutexUnlock(&fs->commitMutex);
+ return 0;
+}
+
+int32_t tsdbFSCheckCommit(STFileSystem *fs) {
+ taosThreadMutexLock(&fs->commitMutex);
+ while (fs->blockCommit) {
+ taosThreadCondWait(&fs->canCommit, &fs->commitMutex);
+ }
+ taosThreadMutexUnlock(&fs->commitMutex);
+ return 0;
+}
+
int32_t tsdbFSEditCommit(STFileSystem *fs) {
int32_t code = 0;
int32_t lino = 0;
@@ -838,19 +867,36 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
TSDB_CHECK_CODE(code, lino, _exit);
// schedule merge
- if (fs->tsdb->pVnode->config.sttTrigger != 1) {
+ if (fs->tsdb->pVnode->config.sttTrigger > 1) {
STFileSet *fset;
+ int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
+ bool schedMerge = false;
+ bool blockCommit = false;
+
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
- if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
+ if (lvl->level != 0) continue;
+ int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
+ if (numFile >= sttTrigger) {
+ schedMerge = true;
+ }
+
+ if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
+ blockCommit = true;
+ }
+
+ if (schedMerge && blockCommit) break;
+ }
+
+ if (schedMerge) {
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
-
- break;
}
+
+ tsdbFSSetBlockCommit(fs, blockCommit);
}
_exit:
@@ -1104,4 +1150,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
fs->stop = false;
taosThreadMutexUnlock(fs->mutex);
return 0;
-}
+}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h
index e814ab2fff..b0f42a0c48 100644
--- a/source/dnode/vnode/src/tsdb/tsdbFS2.h
+++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h
@@ -67,6 +67,7 @@ int32_t tsdbFSDisableBgTask(STFileSystem *fs);
int32_t tsdbFSEnableBgTask(STFileSystem *fs);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
+int32_t tsdbFSCheckCommit(STFileSystem *fs);
struct STFSBgTask {
EFSBgTaskT type;
@@ -103,6 +104,11 @@ struct STFileSystem {
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
+
+ // block commit variables
+ TdThreadMutex commitMutex;
+ TdThreadCond canCommit;
+ bool blockCommit;
};
#ifdef __cplusplus
diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c
index a1643acc50..42a8b5bb3f 100644
--- a/source/dnode/vnode/src/tsdb/tsdbMerge.c
+++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c
@@ -15,6 +15,8 @@
#include "tsdbMerge.h"
+#define TSDB_MAX_LEVEL 6 // means max level is 7
+
typedef struct {
STsdb *tsdb;
TFileSetArray *fsetArr;
@@ -67,18 +69,6 @@ static int32_t tsdbMergerClose(SMerger *merger) {
int32_t lino = 0;
SVnode *pVnode = merger->tsdb->pVnode;
- // edit file system
- code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
- TSDB_CHECK_CODE(code, lino, _exit);
-
- taosThreadRwlockWrlock(&merger->tsdb->rwLock);
- code = tsdbFSEditCommit(merger->tsdb->pFS);
- if (code) {
- taosThreadRwlockUnlock(&merger->tsdb->rwLock);
- TSDB_CHECK_CODE(code, lino, _exit);
- }
- taosThreadRwlockUnlock(&merger->tsdb->rwLock);
-
ASSERT(merger->writer == NULL);
ASSERT(merger->dataIterMerger == NULL);
ASSERT(merger->tombIterMerger == NULL);
@@ -100,90 +90,142 @@ _exit:
}
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
- int32_t code = 0;
- int32_t lino = 0;
-
- merger->ctx->toData = true;
- merger->ctx->level = 0;
-
- // find the highest level that can be merged to
- for (int32_t i = 0, numCarry = 0;;) {
- int32_t numFile = numCarry;
- if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) &&
- merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) {
- numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr);
- i++;
- }
-
- numCarry = numFile / merger->sttTrigger;
- if (numCarry == 0) {
- break;
- } else {
- merger->ctx->level++;
- }
- }
-
- ASSERT(merger->ctx->level > 0);
-
+ int32_t code = 0;
+ int32_t lino = 0;
SSttLvl *lvl;
+
+ bool hasLevelLargerThanMax = false;
TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
- if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
- continue;
- }
-
- if (lvl->level <= merger->ctx->level) {
- merger->ctx->toData = false;
- }
- break;
- }
-
- // get number of level-0 files to merge
- int32_t numFile = pow(merger->sttTrigger, merger->ctx->level);
- TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
- if (lvl->level == 0) continue;
- if (lvl->level >= merger->ctx->level) break;
-
- numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level);
- }
-
- ASSERT(numFile >= 0);
-
- // get file system operations
- TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
- if (lvl->level >= merger->ctx->level) {
+ if (lvl->level <= TSDB_MAX_LEVEL) {
+ break;
+ } else if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
+ hasLevelLargerThanMax = true;
break;
}
+ }
- int32_t numMergeFile;
- if (lvl->level == 0) {
- numMergeFile = numFile;
- } else {
- numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
+ if (hasLevelLargerThanMax) {
+ // merge all stt files
+ merger->ctx->toData = true;
+ merger->ctx->level = TSDB_MAX_LEVEL;
+
+ TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
+ int32_t numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
+
+ for (int32_t i = 0; i < numMergeFile; ++i) {
+ STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
+
+ STFileOp op = {
+ .optype = TSDB_FOP_REMOVE,
+ .fid = merger->ctx->fset->fid,
+ .of = fobj->f[0],
+ };
+ code = TARRAY2_APPEND(merger->fopArr, op);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ SSttFileReader *reader;
+ SSttFileReaderConfig config = {
+ .tsdb = merger->tsdb,
+ .szPage = merger->szPage,
+ .file[0] = fobj->f[0],
+ };
+
+ code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ code = TARRAY2_APPEND(merger->sttReaderArr, reader);
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+ }
+ } else {
+ // do regular merge
+ merger->ctx->toData = true;
+ merger->ctx->level = 0;
+
+ // find the highest level that can be merged to
+ for (int32_t i = 0, numCarry = 0;;) {
+ int32_t numFile = numCarry;
+ if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) &&
+ merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) {
+ numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr);
+ i++;
+ }
+
+ numCarry = numFile / merger->sttTrigger;
+ if (numCarry == 0) {
+ break;
+ } else {
+ merger->ctx->level++;
+ }
}
- for (int32_t i = 0; i < numMergeFile; ++i) {
- STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
+ ASSERT(merger->ctx->level > 0);
- STFileOp op = {
- .optype = TSDB_FOP_REMOVE,
- .fid = merger->ctx->fset->fid,
- .of = fobj->f[0],
- };
- code = TARRAY2_APPEND(merger->fopArr, op);
- TSDB_CHECK_CODE(code, lino, _exit);
+ if (merger->ctx->level <= TSDB_MAX_LEVEL) {
+ TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
+ if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
+ continue;
+ }
- SSttFileReader *reader;
- SSttFileReaderConfig config = {
- .tsdb = merger->tsdb,
- .szPage = merger->szPage,
- .file[0] = fobj->f[0],
- };
+ if (lvl->level >= merger->ctx->level) {
+ merger->ctx->toData = false;
+ }
+ break;
+ }
+ }
- code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
- TSDB_CHECK_CODE(code, lino, _exit);
+ // get number of level-0 files to merge
+ int32_t numFile = pow(merger->sttTrigger, merger->ctx->level);
+ TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
+ if (lvl->level == 0) continue;
+ if (lvl->level >= merger->ctx->level) break;
- code = TARRAY2_APPEND(merger->sttReaderArr, reader);
- TSDB_CHECK_CODE(code, lino, _exit);
+ numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level);
+ }
+
+ ASSERT(numFile >= 0);
+
+ // get file system operations
+ TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
+ if (lvl->level >= merger->ctx->level) {
+ break;
+ }
+
+ int32_t numMergeFile;
+ if (lvl->level == 0) {
+ numMergeFile = numFile;
+ } else {
+ numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
+ }
+
+ for (int32_t i = 0; i < numMergeFile; ++i) {
+ STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
+
+ STFileOp op = {
+ .optype = TSDB_FOP_REMOVE,
+ .fid = merger->ctx->fset->fid,
+ .of = fobj->f[0],
+ };
+ code = TARRAY2_APPEND(merger->fopArr, op);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ SSttFileReader *reader;
+ SSttFileReaderConfig config = {
+ .tsdb = merger->tsdb,
+ .szPage = merger->szPage,
+ .file[0] = fobj->f[0],
+ };
+
+ code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ code = TARRAY2_APPEND(merger->sttReaderArr, reader);
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+ }
+
+ if (merger->ctx->level > TSDB_MAX_LEVEL) {
+ merger->ctx->level = TSDB_MAX_LEVEL;
}
}
@@ -296,6 +338,8 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
ASSERT(merger->dataIterMerger == NULL);
ASSERT(merger->writer == NULL);
+ TARRAY2_CLEAR(merger->fopArr, NULL);
+
merger->ctx->tbid->suid = 0;
merger->ctx->tbid->uid = 0;
@@ -348,6 +392,18 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
code = tsdbMergeFileSetEndCloseReader(merger);
TSDB_CHECK_CODE(code, lino, _exit);
+ // edit file system
+ code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
+ TSDB_CHECK_CODE(code, lino, _exit);
+
+ taosThreadRwlockWrlock(&merger->tsdb->rwLock);
+ code = tsdbFSEditCommit(merger->tsdb->pFS);
+ if (code) {
+ taosThreadRwlockUnlock(&merger->tsdb->rwLock);
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+ taosThreadRwlockUnlock(&merger->tsdb->rwLock);
+
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h
index 996891c77a..7d3e25b3a9 100644
--- a/source/libs/command/inc/commandInt.h
+++ b/source/libs/command/inc/commandInt.h
@@ -111,6 +111,7 @@ extern "C" {
#define COMMAND_CATALOG_DEBUG "catalogDebug"
#define COMMAND_ENABLE_MEM_DEBUG "enableMemDebug"
#define COMMAND_DISABLE_MEM_DEBUG "disableMemDebug"
+#define COMMAND_ASYNCLOG "asynclog"
typedef struct SExplainGroup {
int32_t nodeNum;
@@ -167,7 +168,7 @@ typedef struct SExplainCtx {
} \
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE - tlen, __VA_ARGS__); \
} while (0)
-
+
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE - tlen, __VA_ARGS__)
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0)
diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c
index 8b868ffde4..691f29501c 100644
--- a/source/libs/command/src/command.c
+++ b/source/libs/command/src/command.c
@@ -747,6 +747,16 @@ static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
return code;
}
qInfo("memory dbg disabled");
+ } else if (0 == strcasecmp(cmd, COMMAND_ASYNCLOG)) {
+ int newAsyncLogValue = (strlen(value) == 0) ? 1 : atoi(value);
+ if (newAsyncLogValue != 0 && newAsyncLogValue != 1) {
+ code = TSDB_CODE_INVALID_CFG_VALUE;
+ qError("failed to alter asynclog, error:%s", tstrerror(code));
+ goto _return;
+ }
+
+ code = TSDB_CODE_SUCCESS;
+ tsAsyncLog = newAsyncLogValue;
} else {
goto _return;
}
@@ -946,7 +956,7 @@ int32_t qExecCommand(int64_t* pConnId, bool sysInfoUser, SNode* pStmt, SRetrieve
return execSelectWithoutFrom((SSelectStmt*)pStmt, pRsp);
case QUERY_NODE_SHOW_DB_ALIVE_STMT:
case QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT:
- return execShowAliveStatus(pConnId, (SShowAliveStmt*)pStmt, pRsp);
+ return execShowAliveStatus(pConnId, (SShowAliveStmt*)pStmt, pRsp);
default:
break;
}
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index a419a59996..815f8b0728 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -459,7 +459,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
} else {
stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
- tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
+ tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
taosMkDir(state);
}
taosMemoryFree(chkp);
@@ -813,6 +813,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
}
int32_t nCf = taosArrayGetSize(pHandle);
+ if (nCf == 0) {
+ taosArrayDestroy(pHandle);
+ return nCf;
+ }
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
@@ -845,6 +849,9 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
+ if (nCf == 0) {
+ return 0;
+ }
int code = 0;
char* err = NULL;
@@ -910,7 +917,7 @@ int32_t streamBackendTriggerChkp(void* arg, char* dst) {
stError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
- taosGetTimestampMs() - st);
+ taosGetTimestampMs() - st);
}
} else {
stError("stream backend:%p failed to flush db at:%s", pHandle, dst);
@@ -985,9 +992,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
stError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
- taosGetTimestampMs() - st);
+ taosGetTimestampMs() - st);
}
- } else {
+ } else {
stError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
// release all ref to cfWrapper;
@@ -1711,7 +1718,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
char* status[] = {"close", "drop"};
stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
- wrapper->idstr);
+ wrapper->idstr);
wrapper->remove |= remove; // update by other pState
taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
}
@@ -1783,35 +1790,36 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
}
-#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
- do { \
- code = 0; \
- char buf[128] = {0}; \
- char* err = NULL; \
- int i = streamStateGetCfIdx(pState, funcname); \
- if (i < 0) { \
- stWarn("streamState failed to get cf name: %s", funcname); \
- code = -1; \
- break; \
- } \
- SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
- char toString[128] = {0}; \
- if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
- int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
- rocksdb_t* db = wrapper->rocksdb; \
- rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
- char* ttlV = NULL; \
- int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
- rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
- if (err != NULL) { \
- stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
- taosMemoryFree(err); \
- code = -1; \
- } else { \
- stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
- } \
- taosMemoryFree(ttlV); \
+#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
+ do { \
+ code = 0; \
+ char buf[128] = {0}; \
+ char* err = NULL; \
+ int i = streamStateGetCfIdx(pState, funcname); \
+ if (i < 0) { \
+ stWarn("streamState failed to get cf name: %s", funcname); \
+ code = -1; \
+ break; \
+ } \
+ SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
+ char toString[128] = {0}; \
+ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
+ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
+ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
+ rocksdb_t* db = wrapper->rocksdb; \
+ rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
+ char* ttlV = NULL; \
+ int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
+ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
+ if (err != NULL) { \
+ stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
+ taosMemoryFree(err); \
+ code = -1; \
+ } else { \
+ stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, \
+ ttlVLen); \
+ } \
+ taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
@@ -1821,7 +1829,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
- stWarn("streamState failed to get cf name: %s", funcname); \
+ stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
@@ -1836,9 +1844,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (err == NULL) { \
- stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
+ stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
- stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
+ stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
@@ -1846,11 +1854,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
- stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
- funcname); \
+ stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
+ funcname); \
code = -1; \
} else { \
- stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
+ stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
@@ -1864,7 +1872,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
- stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
+ stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
@@ -1877,11 +1885,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
- stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
+ stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
- stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
+ stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
} \
} while (0);
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 10bbc9a516..f7b0cdb0f1 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -150,7 +150,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) {
-
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
@@ -209,7 +208,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
- stage);
+ stage);
return pMeta;
_err:
@@ -249,7 +248,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
- tstrerror(terrno));
+ tstrerror(terrno));
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
@@ -269,6 +268,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
+ taosMemoryFree(defaultPath);
+ taosMemoryFree(newPath);
return 0;
}
@@ -380,10 +381,10 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) {
- stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId,
- tstrerror(terrno));
+ stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
+ tstrerror(terrno));
} else {
- stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId);
+ stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
}
return code;
@@ -440,7 +441,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
int32_t num = 0;
size_t size = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < size; ++i) {
- STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
+ STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
if (p == NULL) {
continue;
@@ -457,7 +458,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
- STaskId id = {.streamId = streamId, .taskId = taskId};
+ STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
@@ -501,7 +502,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
// pre-delete operation
taosWLockLatch(&pMeta->lock);
- STaskId id = {.streamId = streamId, .taskId = taskId};
+ STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
@@ -518,7 +519,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWUnLockLatch(&pMeta->lock);
stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId,
- streamGetTaskStatusStr(TASK_STATUS__DROPPING));
+ streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while (1) {
taosRLockLatch(&pMeta->lock);
@@ -656,7 +657,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
- TBC* pCur = NULL;
+ TBC* pCur = NULL;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
@@ -689,8 +690,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
stError(
- "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
- "manually", vgId, tsDataDir);
+ "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
+ "stream "
+ "manually",
+ vgId, tsDataDir);
return -1;
}
tDecoderClear(&decoder);
@@ -762,7 +765,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
- pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
+ pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
return 0;
}
@@ -827,7 +830,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
- if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
+ if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
}
@@ -963,7 +966,7 @@ void metaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
- pMeta->pHbInfo->hbCount);
+ pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
} else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
@@ -1000,7 +1003,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
- (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
+ (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock);
diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c
index d988d242c8..f66edb2b34 100644
--- a/source/libs/stream/src/streamSnapshot.c
+++ b/source/libs/stream/src/streamSnapshot.c
@@ -17,8 +17,8 @@
#include "query.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
-#include "tcommon.h"
#include "streamInt.h"
+#include "tcommon.h"
enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1,
@@ -126,7 +126,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
stInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkp(pMeta, chkpId);
} else {
- stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
+ stWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
+ tdir);
}
}
@@ -271,7 +272,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->checkpointId == 0) {
// del tmp dir
- if (taosIsDir(pFile->path)) {
+ if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
}
} else {
@@ -335,24 +336,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
} else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
stDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
- item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
+ item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
}
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
- item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
+ item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) {
taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno);
stError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
- item->type, tstrerror(code));
+ item->type, tstrerror(code));
return -1;
} else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize
stDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
- item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
+ item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd);
@@ -361,7 +362,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
}
} else {
stDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER,
- pHandle->currFileIdx);
+ pHandle->currFileIdx);
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
@@ -379,7 +380,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->offset += nread;
stDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
- STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
+ STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
@@ -434,8 +435,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
- stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
- tstrerror(code));
+ stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
+ pHdr->name, tstrerror(code));
}
}
@@ -461,8 +462,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
- stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name,
- tstrerror(code));
+ stError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP,
+ pHdr->name, tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index b08fa491f4..1627b3e5b4 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -195,7 +195,8 @@
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
-,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
+,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep1.py -N 3
+,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupRep3.py -N 3
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
diff --git a/tests/system-test/0-others/splitVGroupRep1.py b/tests/system-test/0-others/splitVGroupRep1.py
new file mode 100644
index 0000000000..4db5201011
--- /dev/null
+++ b/tests/system-test/0-others/splitVGroupRep1.py
@@ -0,0 +1,439 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+
+import sys
+import random
+import time
+import copy
+import string
+
+import taos
+from util.log import *
+from util.cases import *
+from util.sql import *
+
+class TDTestCase:
+
+ # random string
+ def random_string(self, count):
+ letters = string.ascii_letters
+ return ''.join(random.choice(letters) for i in range(count))
+
+ # get col value and total max min ...
+ def getColsValue(self, i, j):
+ # c1 value
+ if random.randint(1, 10) == 5:
+ c1 = None
+ else:
+ c1 = 1
+
+ # c2 value
+ if j % 3200 == 0:
+ c2 = 8764231
+ elif random.randint(1, 10) == 5:
+ c2 = None
+ else:
+ c2 = random.randint(-87654297, 98765321)
+
+
+ value = f"({self.ts}, "
+
+ # c1
+ if c1 is None:
+ value += "null,"
+ else:
+ self.c1Cnt += 1
+ value += f"{c1},"
+ # c2
+ if c2 is None:
+ value += "null,"
+ else:
+ value += f"{c2},"
+ # total count
+ self.c2Cnt += 1
+ # max
+ if self.c2Max is None:
+ self.c2Max = c2
+ else:
+ if c2 > self.c2Max:
+ self.c2Max = c2
+ # min
+ if self.c2Min is None:
+ self.c2Min = c2
+ else:
+ if c2 < self.c2Min:
+ self.c2Min = c2
+ # sum
+ if self.c2Sum is None:
+ self.c2Sum = c2
+ else:
+ self.c2Sum += c2
+
+ # c3 same with ts
+ value += f"{self.ts})"
+
+ # move next
+ self.ts += 1
+
+ return value
+
+ # insert data
+ def insertData(self):
+ tdLog.info("insert data ....")
+ sqls = ""
+ for i in range(self.childCnt):
+ # insert child table
+ values = ""
+ pre_insert = f"insert into @db_name.t{i} values "
+ for j in range(self.childRow):
+ if values == "":
+ values = self.getColsValue(i, j)
+ else:
+ values += "," + self.getColsValue(i, j)
+
+ # batch insert
+ if j % self.batchSize == 0 and values != "":
+ sql = pre_insert + values
+ self.exeDouble(sql)
+ values = ""
+ # append last
+ if values != "":
+ sql = pre_insert + values
+ self.exeDouble(sql)
+ values = ""
+
+ # insert nomal talbe
+ for i in range(20):
+ self.ts += 1000
+ name = self.random_string(20)
+ sql = f"insert into @db_name.ta values({self.ts}, {i}, {self.ts%100000}, '{name}', false)"
+ self.exeDouble(sql)
+
+ # insert finished
+ tdLog.info(f"insert data successfully.\n"
+ f" inserted child table = {self.childCnt}\n"
+ f" inserted child rows = {self.childRow}\n"
+ f" total inserted rows = {self.childCnt*self.childRow}\n")
+ return
+
+ def exeDouble(self, sql):
+ # dbname replace
+ sql1 = sql.replace("@db_name", self.db1)
+
+ if len(sql1) > 100:
+ tdLog.info(sql1[:100])
+ else:
+ tdLog.info(sql1)
+ tdSql.execute(sql1)
+
+ sql2 = sql.replace("@db_name", self.db2)
+ if len(sql2) > 100:
+ tdLog.info(sql2[:100])
+ else:
+ tdLog.info(sql2)
+ tdSql.execute(sql2)
+
+
+ # prepareEnv
+ def prepareEnv(self):
+ # init
+ self.ts = 1680000000000
+ self.childCnt = 4
+ self.childRow = 10000
+ self.batchSize = 50000
+ self.vgroups1 = 1
+ self.vgroups2 = 1
+ self.db1 = "db1"
+ self.db2 = "db2"
+
+ # total
+ self.c1Cnt = 0
+ self.c2Cnt = 0
+ self.c2Max = None
+ self.c2Min = None
+ self.c2Sum = None
+
+ # create database db
+ sql = f"create database @db_name vgroups {self.vgroups1} replica 1"
+ self.exeDouble(sql)
+
+ # create super talbe st
+ sql = f"create table @db_name.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
+ self.exeDouble(sql)
+
+ # create child table
+ for i in range(self.childCnt):
+ sql = f"create table @db_name.t{i} using @db_name.st tags({i}) "
+ self.exeDouble(sql)
+
+ # create normal table
+ sql = f"create table @db_name.ta(ts timestamp, c1 int, c2 bigint, c3 binary(32), c4 bool)"
+ self.exeDouble(sql)
+
+ # insert data
+ self.insertData()
+
+ # update
+ self.ts = 1680000000000 + 20000
+ self.childRow = 1000
+
+
+ # delete data
+ sql = "delete from @db_name.st where ts > 1680000019000 and ts < 1680000062000"
+ self.exeDouble(sql)
+ sql = "delete from @db_name.st where ts > 1680000099000 and ts < 1680000170000"
+ self.exeDouble(sql)
+
+ # check data correct
+ def checkExpect(self, sql, expectVal):
+ tdSql.query(sql)
+ rowCnt = tdSql.getRows()
+ for i in range(rowCnt):
+ val = tdSql.getData(i,0)
+ if val != expectVal:
+ tdLog.exit(f"Not expect . query={val} expect={expectVal} i={i} sql={sql}")
+ return False
+
+ tdLog.info(f"check expect ok. sql={sql} expect ={expectVal} rowCnt={rowCnt}")
+ return True
+
+ # init
+ def init(self, conn, logSql, replicaVar=1):
+ seed = time.time() % 10000
+ random.seed(seed)
+ self.replicaVar = int(replicaVar)
+ tdLog.debug(f"start to excute {__file__}")
+ tdSql.init(conn.cursor(), True)
+
+ # check query result same
+ def queryDouble(self, sql):
+ # sql
+ sql1 = sql.replace('@db_name', self.db1)
+ tdLog.info(sql1)
+ start1 = time.time()
+ rows1 = tdSql.query(sql1)
+ spend1 = time.time() - start1
+ res1 = copy.copy(tdSql.queryResult)
+
+ sql2 = sql.replace('@db_name', self.db2)
+ tdLog.info(sql2)
+ start2 = time.time()
+ tdSql.query(sql2)
+ spend2 = time.time() - start2
+ res2 = tdSql.queryResult
+
+ rowlen1 = len(res1)
+ rowlen2 = len(res2)
+
+ if rowlen1 != rowlen2:
+ tdLog.exit(f"both row count not equal. rowlen1={rowlen1} rowlen2={rowlen2} ")
+ return False
+
+ for i in range(rowlen1):
+ row1 = res1[i]
+ row2 = res2[i]
+ collen1 = len(row1)
+ collen2 = len(row2)
+ if collen1 != collen2:
+ tdLog.exit(f"both col count not equal. collen1={collen1} collen2={collen2}")
+ return False
+ for j in range(collen1):
+ if row1[j] != row2[j]:
+ tdLog.exit(f"both col not equal. row={i} col={j} col1={row1[j]} col2={row2[j]} .")
+ return False
+
+ # warning performance
+ diff = (spend2 - spend1)*100/spend1
+ tdLog.info("spend1=%.6fs spend2=%.6fs diff=%.1f%%"%(spend1, spend2, diff))
+ if spend2 > spend1 and diff > 20:
+ tdLog.info("warning: the diff for performance after spliting is over 20%")
+
+ return True
+
+
+ # check result
+ def checkResult(self):
+ # check vgroupid
+ sql = f"select vgroup_id from information_schema.ins_vgroups where db_name='{self.db2}'"
+ tdSql.query(sql)
+ tdSql.checkRows(self.vgroups2)
+
+ # check child table count same
+ sql = "select table_name from information_schema.ins_tables where db_name='@db_name' order by table_name"
+ self.queryDouble(sql)
+
+ # check row value is ok
+ sql = "select * from @db_name.st order by ts"
+ self.queryDouble(sql)
+
+ # where
+ sql = "select *,tbname from @db_name.st where c1 < 1000 order by ts"
+ self.queryDouble(sql)
+
+ # max
+ sql = "select max(c1) from @db_name.st"
+ self.queryDouble(sql)
+
+ # min
+ sql = "select min(c2) from @db_name.st"
+ self.queryDouble(sql)
+
+ # sum
+ sql = "select sum(c1) from @db_name.st"
+ self.queryDouble(sql)
+
+ # normal table
+
+ # count
+ sql = "select count(*) from @db_name.ta"
+ self.queryDouble(sql)
+
+ # all rows
+ sql = "select * from @db_name.ta"
+ self.queryDouble(sql)
+
+ # sum
+ sql = "select sum(c1) from @db_name.ta"
+ self.queryDouble(sql)
+
+
+ # get vgroup list
+ def getVGroup(self, db_name):
+ vgidList = []
+ sql = f"select vgroup_id from information_schema.ins_vgroups where db_name='{db_name}'"
+ res = tdSql.getResult(sql)
+ rows = len(res)
+ for i in range(rows):
+ vgidList.append(res[i][0])
+
+ return vgidList;
+
+ # split vgroup on db2
+ def splitVGroup(self, db_name):
+ vgids = self.getVGroup(db_name)
+ selid = random.choice(vgids)
+ sql = f"split vgroup {selid}"
+ tdLog.info(sql)
+ tdSql.execute(sql)
+
+ # wait end
+ seconds = 300
+ for i in range(seconds):
+ sql ="show transactions;"
+ rows = tdSql.query(sql)
+ if rows == 0:
+ tdLog.info("split vgroup finished.")
+ return True
+ #tdLog.info(f"i={i} wait split vgroup ...")
+ time.sleep(1)
+
+ tdLog.exit(f"split vgroup transaction is not finished after executing {seconds}s")
+ return False
+
+ # split error
+ def expectSplitError(self, dbName):
+ vgids = self.getVGroup(dbName)
+ selid = random.choice(vgids)
+ sql = f"split vgroup {selid}"
+ tdLog.info(sql)
+ tdSql.error(sql)
+
+ # expect split ok
+ def expectSplitOk(self, dbName):
+ # split vgroup
+ vgList1 = self.getVGroup(dbName)
+ self.splitVGroup(dbName)
+ vgList2 = self.getVGroup(dbName)
+ vgNum1 = len(vgList1) + 1
+ vgNum2 = len(vgList2)
+ if vgNum1 != vgNum2:
+ tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
+ return
+
+ # split empty database
+ def splitEmptyDB(self):
+ dbName = "emptydb"
+ vgNum = 2
+ # create database
+ sql = f"create database {dbName} vgroups {vgNum} replica 1"
+ tdLog.info(sql)
+ tdSql.execute(sql)
+
+ # split vgroup
+ self.expectSplitOk(dbName)
+
+
+ # forbid
+ def checkForbid(self):
+ # stream
+ tdLog.info("check forbid split having stream...")
+ tdSql.execute("create database streamdb;")
+ tdSql.execute("use streamdb;")
+ tdSql.execute("create table ta(ts timestamp, age int);")
+ tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);")
+ self.expectSplitError("streamdb")
+ tdSql.execute("drop stream ma;")
+ self.expectSplitOk("streamdb")
+
+ # topic
+ tdLog.info("check forbid split having topic...")
+ tdSql.execute("create database topicdb wal_retention_period 10;")
+ tdSql.execute("use topicdb;")
+ tdSql.execute("create table ta(ts timestamp, age int);")
+ tdSql.execute("create topic toa as select * from ta;")
+ self.expectSplitError("topicdb")
+ tdSql.execute("drop topic toa;")
+ self.expectSplitOk("topicdb")
+
+ # compact and check db2
+ def compactAndCheck(self):
+ tdLog.info("compact db2 and check result ...")
+ # compact
+ tdSql.execute(f"compact database {self.db2};")
+ # check result
+ self.checkResult()
+
+ # run
+ def run(self):
+ # prepare env
+ self.prepareEnv()
+
+ for i in range(3):
+ # split vgroup on db2
+ start = time.time()
+ self.splitVGroup(self.db2)
+ end = time.time()
+ self.vgroups2 += 1
+
+ # check two db query result same
+ self.checkResult()
+ spend = "%.3f"%(end-start)
+ tdLog.info(f"split vgroup i={i} passed. spend = {spend}s")
+
+ # split empty db
+ self.splitEmptyDB()
+
+ # check topic and stream forib
+ self.checkForbid()
+
+ # compact database
+ self.compactAndCheck()
+
+ # stop
+ def stop(self):
+ tdSql.close()
+ tdLog.success(f"{__file__} successfully executed")
+
+
+tdCases.addLinux(__file__, TDTestCase())
+tdCases.addWindows(__file__, TDTestCase())
diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroupRep3.py
similarity index 93%
rename from tests/system-test/0-others/splitVGroup.py
rename to tests/system-test/0-others/splitVGroupRep3.py
index e9ec34f829..d32a9747b5 100644
--- a/tests/system-test/0-others/splitVGroup.py
+++ b/tests/system-test/0-others/splitVGroupRep3.py
@@ -137,10 +137,10 @@ class TDTestCase:
tdSql.execute(sql1)
sql2 = sql.replace("@db_name", self.db2)
- if len(sql1) > 100:
- tdLog.info(sql1[:100])
+ if len(sql2) > 100:
+ tdLog.info(sql2[:100])
else:
- tdLog.info(sql1)
+ tdLog.info(sql2)
tdSql.execute(sql2)
@@ -151,8 +151,8 @@ class TDTestCase:
self.childCnt = 10
self.childRow = 10000
self.batchSize = 5000
- self.vgroups1 = 4
- self.vgroups2 = 4
+ self.vgroups1 = 2
+ self.vgroups2 = 2
self.db1 = "db1"
self.db2 = "db2"
@@ -183,6 +183,16 @@ class TDTestCase:
# insert data
self.insertData()
+ # update
+ self.ts = 1680000000000 + 10000
+ self.childRow = 2000
+
+ # delete data
+ sql = "delete from @db_name.st where ts > 1680000001900 and ts < 1680000012000"
+ self.exeDouble(sql)
+ sql = "delete from @db_name.st where ts > 1680000029000 and ts < 1680000048000"
+ self.exeDouble(sql)
+
# check data correct
def checkExpect(self, sql, expectVal):
tdSql.query(sql)
@@ -225,7 +235,7 @@ class TDTestCase:
rowlen2 = len(res2)
if rowlen1 != rowlen2:
- tdLog.exit(f"rowlen1={rowlen1} rowlen2={rowlen2} both not equal.")
+ tdLog.exit(f"both row count not equal. rowlen1={rowlen1} rowlen2={rowlen2} ")
return False
for i in range(rowlen1):
@@ -234,11 +244,11 @@ class TDTestCase:
collen1 = len(row1)
collen2 = len(row2)
if collen1 != collen2:
- tdLog.exit(f"collen1={collen1} collen2={collen2} both not equal.")
+ tdLog.exit(f"both col count not equal. collen1={collen1} collen2={collen2}")
return False
for j in range(collen1):
if row1[j] != row2[j]:
- tdLog.exit(f"col={j} col1={row1[j]} col2={row2[j]} both col not equal.")
+ tdLog.exit(f"both col not equal. row={i} col={j} col1={row1[j]} col2={row2[j]} .")
return False
# warning performance
@@ -354,7 +364,7 @@ class TDTestCase:
dbName = "emptydb"
vgNum = 2
# create database
- sql = f"create database {dbName} vgroups {vgNum}"
+ sql = f"create database {dbName} vgroups {vgNum} replica 3"
tdLog.info(sql)
tdSql.execute(sql)
@@ -397,7 +407,7 @@ class TDTestCase:
# prepare env
self.prepareEnv()
- for i in range(3):
+ for i in range(2):
# split vgroup on db2
start = time.time()
self.splitVGroup(self.db2)
diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c
index 41cdb0f928..60d6388faa 100644
--- a/tools/shell/src/shellAuto.c
+++ b/tools/shell/src/shellAuto.c
@@ -66,6 +66,8 @@ SWords shellCommands[] = {
{"alter dnode \"debugFlag\" \"141\";", 0, 0, NULL},
{"alter dnode \"monitor\" \"0\";", 0, 0, NULL},
{"alter dnode \"monitor\" \"1\";", 0, 0, NULL},
+ {"alter dnode \"asynclog\" \"0\";", 0, 0, NULL},
+ {"alter dnode \"asynclog\" \"1\";", 0, 0, NULL},
{"alter all dnodes \"resetlog\";", 0, 0, NULL},
{"alter all dnodes \"debugFlag\" \"141\";", 0, 0, NULL},
{"alter all dnodes \"monitor\" \"0\";", 0, 0, NULL},
@@ -77,6 +79,8 @@ SWords shellCommands[] = {
{"alter local \"uDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local \"rpcDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local \"tmrDebugFlag\" \"143\";", 0, 0, NULL},
+ {"alter local \"asynclog\" \"0\";", 0, 0, NULL},
+ {"alter local \"asynclog\" \"1\";", 0, 0, NULL},
{"alter topic", 0, 0, NULL},
{"alter user ;", 0, 0, NULL},
// 20
@@ -184,7 +188,7 @@ SWords shellCommands[] = {
{"show grants;", 0, 0, NULL},
#ifdef TD_ENTERPRISE
{"split vgroup ", 0, 0, NULL},
-#endif
+#endif
{"insert into values(", 0, 0, NULL},
{"insert into using tags(", 0, 0, NULL},
{"insert into using values(", 0, 0, NULL},
@@ -391,13 +395,19 @@ void showHelp() {
alter dnode 'monitor' '0';\n\
alter dnode 'monitor' \"1\";\n\
alter dnode \"debugflag\" \"143\";\n\
+ alter dnode 'asynclog' '0';\n\
+ alter dnode 'asynclog' \"1\";\n\
alter all dnodes \"monitor\" \"0\";\n\
alter all dnodes \"monitor\" \"1\";\n\
alter all dnodes \"resetlog\";\n\
alter all dnodes \"debugFlag\" \n\
+ alter all dnodes \"asynclog\" \"0\";\n\
+ alter all dnodes \"asynclog\" \"1\";\n\
alter table ;\n\
alter local \"resetlog\";\n\
alter local \"DebugFlag\" \"143\";\n\
+ alter local \"asynclog\" \"0\";\n\
+ alter local \"asynclog\" \"1\";\n\
alter topic\n\
alter user ...\n\
----- C ----- \n\