From a1aa2c9e0c2f1a5b6fc749b0a256892b973a0ac6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 16 Oct 2023 18:47:01 +0800 Subject: [PATCH 01/13] fix(stream):add more check for test cases. --- source/dnode/vnode/src/tq/tq.c | 4 ++++ source/libs/stream/src/streamExec.c | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8f3661dffa..1c90812d95 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1196,7 +1196,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->status.keepTaskStatus = status; pStreamTask->status.taskStatus = TASK_STATUS__HALT; + // wal scan not start yet, reset it to be the start position nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + if (nextProcessedVer == -1) { + nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1; + } tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus, diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 12b51e6c93..c49c647906 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -309,7 +309,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); + ASSERT(((pStreamTask->status.taskStatus == TASK_STATUS__STOP) || + (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) && + pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; From 252b90bfab807a63537618818ffb78b76c0b7cbb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Oct 2023 19:20:09 +0800 Subject: [PATCH 02/13] fix stream snap deadlock --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 09fffa1f74..c6255be7cb 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -198,8 +198,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { taosWLockLatch(&pTq->pStreamMeta->lock); tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode)); - - taosWLockLatch(&pTq->pStreamMeta->lock); if (rollback) { tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); } else { From 0adefd5e59e236737c62eaad7e4d93fe1afad826 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Oct 2023 19:26:33 +0800 Subject: [PATCH 03/13] fix stream snap deadlock --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index c6255be7cb..a406b8df34 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -211,8 +211,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { goto _err; } - taosWUnLockLatch(&pTq->pStreamMeta->lock); - if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { code = -1; taosMemoryFree(pWriter); From f1186be19afda5be6af2eef2d4c3727a67ffcb45 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Oct 2023 19:38:55 +0800 Subject: [PATCH 04/13] fix stream snap deadlock --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index a406b8df34..e122cf19d3 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -206,10 +206,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { code = tdbPostCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); if (code) goto _err; } - if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; - goto _err; - } if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { code = -1; From 990d36654db5f177008cde1cccdee0e759cef5cc Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 16 Oct 2023 19:49:45 +0800 Subject: [PATCH 05/13] release buff --- source/libs/stream/src/tstreamFileState.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ac404893f0..b10db2a1af 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -297,6 +297,10 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin while ((pNode = tdListNext(&iter)) != NULL && i < max) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (pPos->beUsed == used) { + if (used && !pPos->pRowBuff) { + ASSERT(pPos->needFree == true); + continue; + } tdListAppend(pFlushList, &pPos); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); From b793c92fed719514aa0dcd98c5af2a3d94af8c5a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Oct 2023 22:00:34 +0800 Subject: [PATCH 06/13] fix stream snap deadlock --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index e122cf19d3..f22ecc3daf 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -212,9 +212,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { taosMemoryFree(pWriter); goto _err; } - taosWUnLockLatch(&pTq->pStreamMeta->lock); - taosMemoryFree(pWriter); return code; From fdb6ec1fa3be1f0c2b7f8c0176dcc4ab3137db31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 16 Oct 2023 22:52:39 +0800 Subject: [PATCH 07/13] enh(stream): add parameter to limit the stream sink task. --- include/common/tglobal.h | 4 +--- include/libs/stream/tstream.h | 1 - source/common/src/tglobal.c | 2 ++ source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamQueue.c | 19 ++++++------------- source/libs/stream/src/streamTask.c | 2 +- 6 files changed, 11 insertions(+), 19 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 4644c38ec4..dc125f5371 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -194,6 +194,7 @@ extern int64_t tsWalFsyncDataSizeLimit; extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointTickInterval; +extern double tsSinkDataRate; extern int32_t tsStreamNodeCheckInterval; extern int32_t tsTtlUnit; extern int32_t tsTtlPushIntervalSec; @@ -202,9 +203,6 @@ extern int32_t tsTrimVDbIntervalSec; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; -extern int32_t tsRpcRetryLimit; -extern int32_t tsRpcRetryInterval; - extern bool tsDisableStream; extern int64_t tsStreamBufferSize; extern bool tsFilterScalarMode; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 629efa00b3..6cbe6ad7da 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -714,7 +714,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); -int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize); // common int32_t streamRestoreParam(SStreamTask* pTask); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3d7b38161a..77684552d4 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -245,6 +245,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointTickInterval = 300; +double tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 30; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; @@ -651,6 +652,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4cd8319a07..d98fa2f436 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -122,7 +122,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index ae285046ef..e396ac77b4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -18,7 +18,7 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define WAIT_FOR_DURATION 40 -#define SINK_TASK_IDLE_DURATION 200 // 200 ms +#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms // todo refactor: // read data from input queue @@ -119,14 +119,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } -int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize) { - int32_t num = streamQueueGetNumOfItems(pQueue); - *availNum = STREAM_TASK_QUEUE_CAPACITY - num; - - *availSize = STREAM_TASK_QUEUE_CAPACITY_IN_SIZE - taosQueueMemorySize(pQueue->pQueue); - return 0; -} - // todo: fix it: data in Qall is not included here int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { return taosQueueMemorySize(pQueue->pQueue); @@ -362,9 +354,10 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); // let's wait for there are enough space to hold this result pBlock - stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, - total, size); - taosMsleep(500); + stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, + OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size); + + taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION); } int32_t code = taosWriteQitem(pQueue, pBlock); @@ -381,7 +374,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, int32_t quotaRate) { +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) { if (numCap < 10 || numRate < 10 || pBucket == NULL) { stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 57103e5a96..fd9902406d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -427,7 +427,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, 2); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); From 531f478ade0ce3453a529637961808f3d264eea0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 Oct 2023 08:17:49 +0800 Subject: [PATCH 08/13] refactor: do some internal refactor W.R.T. global configurations. --- include/common/tglobal.h | 2 +- source/common/src/tglobal.c | 40 ++++++++++++------------- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index dc125f5371..6f35a7ab35 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -193,7 +193,7 @@ extern int64_t tsWalFsyncDataSizeLimit; // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; -extern int32_t tsStreamCheckpointTickInterval; +extern int32_t tsStreamCheckpointInterval; extern double tsSinkDataRate; extern int32_t tsStreamNodeCheckInterval; extern int32_t tsTtlUnit; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 77684552d4..d57bb235f9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -244,7 +244,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 300; +int32_t tsStreamCheckpointInterval = 300; double tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 30; int32_t tsTtlUnit = 86400; @@ -270,8 +270,6 @@ int8_t tsS3Enabled = false; int32_t tsS3BlockSize = 4096; // number of tsdb pages int32_t tsS3BlockCacheSize = 16; // number of blocks -int32_t tsCheckpointInterval = 300; - #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); @@ -651,19 +649,18 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointTickInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "streamcheckpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) - return -1; + if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddString(pCfg, "LossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddFloat(pCfg, "FPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddDouble(pCfg, "DPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "MaxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "CurRange", tsCurRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddBool(pCfg, "IfAdtFse", tsIfAdtFse, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddString(pCfg, "Compressor", tsCompressor, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddDouble(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "maxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "curRange", tsCurRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddBool(pCfg, "ifAdtFse", tsIfAdtFse, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "compressor", tsCompressor, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER) != 0) return -1; @@ -1082,17 +1079,18 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32; - tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "LossyColumns")->str, sizeof(tsLossyColumns)); - tsFPrecision = cfgGetItem(pCfg, "FPrecision")->fval; - tsDPrecision = cfgGetItem(pCfg, "DPrecision")->dval; - tsMaxRange = cfgGetItem(pCfg, "MaxRange")->i32; - tsCurRange = cfgGetItem(pCfg, "CurRange")->i32; - tsIfAdtFse = cfgGetItem(pCfg, "IfAdtFse")->bval; - tstrncpy(tsCompressor, cfgGetItem(pCfg, "Compressor")->str, sizeof(tsCompressor)); - + tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "lossyColumns")->str, sizeof(tsLossyColumns)); + tsFPrecision = cfgGetItem(pCfg, "fPrecision")->fval; + tsDPrecision = cfgGetItem(pCfg, "dPrecision")->dval; + tsMaxRange = cfgGetItem(pCfg, "maxRange")->i32; + tsCurRange = cfgGetItem(pCfg, "curRange")->i32; + tsIfAdtFse = cfgGetItem(pCfg, "ifAdtFse")->bval; + tstrncpy(tsCompressor, cfgGetItem(pCfg, "compressor")->str, sizeof(tsCompressor)); tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; + tsStreamCheckpointInterval = cfgGetItem(pCfg, "streamcheckpointInterval")->i32; + tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->dval; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 1c87cde78a..08ad6cc909 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -281,7 +281,7 @@ static void *mndThreadFp(void *param) { mndCalMqRebalance(pMnode); } - if (sec % tsStreamCheckpointTickInterval == 0) { + if (sec % tsStreamCheckpointInterval == 0) { mndStreamCheckpointTick(pMnode, sec); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index dff0b004ce..f23d596449 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -937,7 +937,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in } // static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { // int64_t timestampMs = taosGetTimestampMs(); -// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { +// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000) { // return -1; // } From 3f7737d7338b8bdcdd0f347902a7e60b615e7a11 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 Oct 2023 08:36:12 +0800 Subject: [PATCH 09/13] refactor: refactor the global parameters. --- source/common/src/tglobal.c | 16 ++++++++-------- source/util/src/tconfig.c | 26 +++++++++++++------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d57bb235f9..dd3bdf86b2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -489,11 +489,11 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "enableCoreFile", 1, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "SSE42", tsSSE42Enable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "AVX", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "FMA", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "ssd42", tsSSE42Enable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "simdEnable", tsSIMDBuiltins, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1; @@ -649,7 +649,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddInt64(pCfg, "streamcheckpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; @@ -1021,7 +1021,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; - tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "SIMD-builtins")->bval; + tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "simdEnable")->bval; tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; @@ -1089,7 +1089,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; - tsStreamCheckpointInterval = cfgGetItem(pCfg, "streamcheckpointInterval")->i32; + tsStreamCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i32; tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->dval; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 34c3222bff..50b01fb390 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -611,8 +611,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { switch (pItem->dtype) { case CFG_DTYPE_BOOL: if (dump) { - printf("%s %s %u", src, name, pItem->bval); - printf("\n"); + printf("%s %s %u\n", src, name, pItem->bval); } else { uInfo("%s %s %u", src, name, pItem->bval); } @@ -620,29 +619,32 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { break; case CFG_DTYPE_INT32: if (dump) { - printf("%s %s %d", src, name, pItem->i32); - printf("\n"); + printf("%s %s %d\n", src, name, pItem->i32); } else { uInfo("%s %s %d", src, name, pItem->i32); } break; case CFG_DTYPE_INT64: if (dump) { - printf("%s %s %" PRId64, src, name, pItem->i64); - printf("\n"); + printf("%s %s %" PRId64"\n", src, name, pItem->i64); } else { uInfo("%s %s %" PRId64, src, name, pItem->i64); } break; case CFG_DTYPE_FLOAT: - case CFG_DTYPE_DOUBLE: if (dump) { - printf("%s %s %.2f", src, name, pItem->fval); - printf("\n"); + printf("%s %s %.2f\n", src, name, pItem->fval); } else { uInfo("%s %s %.2f", src, name, pItem->fval); } break; + case CFG_DTYPE_DOUBLE: + if (dump) { + printf("%s %s %.2f\n", src, name, pItem->dval); + } else { + uInfo("%s %s %.2f", src, name, pItem->dval); + } + break; case CFG_DTYPE_STRING: case CFG_DTYPE_DIR: case CFG_DTYPE_LOCALE: @@ -650,8 +652,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { case CFG_DTYPE_TIMEZONE: case CFG_DTYPE_NONE: if (dump) { - printf("%s %s %s", src, name, pItem->str); - printf("\n"); + printf("%s %s %s\n", src, name, pItem->str); } else { uInfo("%s %s %s", src, name, pItem->str); } @@ -660,8 +661,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { } if (dump) { - printf("================================================================="); - printf("\n"); + printf("=================================================================\n"); } else { uInfo("================================================================="); } From e3da6ab36e3191683e6b476e11bca456a0c685e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 Oct 2023 09:00:04 +0800 Subject: [PATCH 10/13] fix(config): remove the dval from struct SConfigItem. --- include/common/tglobal.h | 2 +- include/util/tconfig.h | 2 -- source/common/src/tglobal.c | 10 +++++----- source/util/src/tconfig.c | 18 +----------------- 4 files changed, 7 insertions(+), 25 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6f35a7ab35..df46cc689b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -194,7 +194,7 @@ extern int64_t tsWalFsyncDataSizeLimit; extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointInterval; -extern double tsSinkDataRate; +extern float tsSinkDataRate; extern int32_t tsStreamNodeCheckInterval; extern int32_t tsTtlUnit; extern int32_t tsTtlPushIntervalSec; diff --git a/include/util/tconfig.h b/include/util/tconfig.h index aaad467737..34798f3816 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -65,7 +65,6 @@ typedef struct SConfigItem { union { bool bval; float fval; - double dval; int32_t i32; int64_t i64; char *str; @@ -104,7 +103,6 @@ int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scop int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope); int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope); int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope); -int32_t cfgAddDouble(SConfig *pCfg, const char *name, double defaultVal, double minval, double maxval, int8_t scope); int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope); int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope); int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index dd3bdf86b2..568808405e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -245,7 +245,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 300; -double tsSinkDataRate = 2.0; +float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 30; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; @@ -650,13 +650,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddDouble(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddFloat(pCfg, "streamSinkDataRate", tsSinkDataRate, 0.1, 5, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER) != 0) return -1; - if (cfgAddDouble(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "maxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "curRange", tsCurRange, 0, 65536, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "ifAdtFse", tsIfAdtFse, CFG_SCOPE_SERVER) != 0) return -1; @@ -1081,7 +1081,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "lossyColumns")->str, sizeof(tsLossyColumns)); tsFPrecision = cfgGetItem(pCfg, "fPrecision")->fval; - tsDPrecision = cfgGetItem(pCfg, "dPrecision")->dval; + tsDPrecision = cfgGetItem(pCfg, "dPrecision")->fval; tsMaxRange = cfgGetItem(pCfg, "maxRange")->i32; tsCurRange = cfgGetItem(pCfg, "curRange")->i32; tsIfAdtFse = cfgGetItem(pCfg, "ifAdtFse")->bval; @@ -1090,7 +1090,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; tsStreamCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i32; - tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->dval; + tsSinkDataRate = cfgGetItem(pCfg, "streamSinkDataRate")->fval; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 50b01fb390..08bb8ecabf 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -416,16 +416,6 @@ int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float min return cfgAddItem(pCfg, &item, name); } -int32_t cfgAddDouble(SConfig *pCfg, const char *name, double defaultVal, double minval, double maxval, int8_t scope) { - if (defaultVal < minval || defaultVal > maxval) { - terrno = TSDB_CODE_OUT_OF_RANGE; - return -1; - } - - SConfigItem item = {.dtype = CFG_DTYPE_DOUBLE, .dval = defaultVal, .fmin = minval, .fmax = maxval, .scope = scope}; - return cfgAddItem(pCfg, &item, name); -} - int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) { SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope}; item.str = taosStrdup(defaultVal); @@ -631,6 +621,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { uInfo("%s %s %" PRId64, src, name, pItem->i64); } break; + case CFG_DTYPE_DOUBLE: case CFG_DTYPE_FLOAT: if (dump) { printf("%s %s %.2f\n", src, name, pItem->fval); @@ -638,13 +629,6 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { uInfo("%s %s %.2f", src, name, pItem->fval); } break; - case CFG_DTYPE_DOUBLE: - if (dump) { - printf("%s %s %.2f\n", src, name, pItem->dval); - } else { - uInfo("%s %s %.2f", src, name, pItem->dval); - } - break; case CFG_DTYPE_STRING: case CFG_DTYPE_DIR: case CFG_DTYPE_LOCALE: From 14035c763cbc79e2befa44db32ab67d78f351122 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 Oct 2023 09:57:13 +0800 Subject: [PATCH 11/13] refactor: do some internal refactor. --- source/libs/stream/src/{streamRecover.c => streamStart.c} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename source/libs/stream/src/{streamRecover.c => streamStart.c} (100%) diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamStart.c similarity index 100% rename from source/libs/stream/src/streamRecover.c rename to source/libs/stream/src/streamStart.c From 5e0f9cb908b363f4b4cc7dcb02b50affdf67d6af Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 17 Oct 2023 14:32:09 +0800 Subject: [PATCH 12/13] double free --- source/libs/executor/src/streamtimewindowoperator.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2eb6fb2d64..fd8415c148 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -373,8 +373,11 @@ void clearGroupResInfo(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->freeItem) { int32_t size = taosArrayGetSize(pGroupResInfo->pRows); for (int32_t i = pGroupResInfo->index; i < size; i++) { - void* pVal = taosArrayGetP(pGroupResInfo->pRows, i); - taosMemoryFree(pVal); + SRowBuffPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + if (!pPos->needFree && !pPos->pRowBuff) { + taosMemoryFreeClear(pPos->pKey); + taosMemoryFree(pPos); + } } pGroupResInfo->freeItem = false; } From c0678e9842de4c0fa78b16da843d9a97c93e911e Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Tue, 17 Oct 2023 13:42:37 +0800 Subject: [PATCH 13/13] update script --- packaging/tools/install.sh | 140 +++++++++++++++---------------------- packaging/tools/makepkg.sh | 1 + packaging/tools/remove.sh | 33 ++------- 3 files changed, 63 insertions(+), 111 deletions(-) diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index e8798ed16b..f646009b21 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -377,13 +377,14 @@ function add_newHostname_to_hosts() { } function set_hostname() { - echo -e -n "${GREEN}Please enter one hostname(must not be 'localhost')${NC}:" + echo -e -n "${GREEN}Enter the public accessible IP address or fully qualified domain name TDengine will expose to users or applications (must not be 'localhost') :${NC}" read newHostname while true; do if [[ ! -z "$newHostname" && "$newHostname" != "localhost" ]]; then break else - read -p "Please enter one hostname(must not be 'localhost'):" newHostname + echo -e -n "${GREEN}Enter the public accessible IP address or fully qualified domain name TDengine will expose to users or applications (must not be 'localhost') :${NC}" + read newHostname fi done @@ -476,34 +477,7 @@ function local_fqdn_check() { echo echo -e -n "System hostname is: ${GREEN}$serverFqdn${NC}" echo - if [[ "$serverFqdn" == "" ]] || [[ "$serverFqdn" == "localhost" ]]; then - echo -e -n "${GREEN}It is strongly recommended to configure a hostname for this machine ${NC}" - echo - - while true; do - read -r -p "Set hostname now? [Y/n] " input - if [ ! -n "$input" ]; then - set_hostname - break - else - case $input in - [yY][eE][sS] | [yY]) - set_hostname - break - ;; - - [nN][oO] | [nN]) - set_ipAsFqdn - break - ;; - - *) - echo "Invalid input..." - ;; - esac - fi - done - fi + set_hostname } function install_adapter_config() { @@ -525,11 +499,19 @@ function install_adapter_config() { function install_config() { + local_fqdn_check + if [ ! -f "${cfg_install_dir}/${configFile2}" ]; then ${csudo}mkdir -p ${cfg_install_dir} - [ -f ${script_dir}/cfg/${configFile2} ] && ${csudo}cp ${script_dir}/cfg/${configFile2} ${cfg_install_dir} + if [ -f ${script_dir}/cfg/${configFile2} ]; then + ${csudo} echo "monitor 1" >> ${script_dir}/cfg/${configFile2} + ${csudo} echo "monitorFQDN ${serverFqdn}" >> ${script_dir}/cfg/${configFile2} + ${csudo}cp ${script_dir}/cfg/${configFile2} ${cfg_install_dir} + fi ${csudo}chmod 644 ${cfg_install_dir}/* else + ${csudo} echo "monitor 1" >> ${script_dir}/cfg/${configFile2} + ${csudo} echo "monitorFQDN ${serverFqdn}" >> ${script_dir}/cfg/${configFile2} ${csudo}cp -f ${script_dir}/cfg/${configFile2} ${cfg_install_dir}/${configFile2}.new fi @@ -537,15 +519,15 @@ function install_config() { [ ! -z $1 ] && return 0 || : # only install client - if ((${update_flag} == 1)); then - return 0 - fi + - if [ "$interactiveFqdn" == "no" ]; then - return 0 - fi + # if ((${update_flag} == 1)); then + # return 0 + # fi - local_fqdn_check + # if [ "$interactiveFqdn" == "no" ]; then + # return 0 + # fi echo echo -e -n "${GREEN}Enter FQDN:port (like h1.${emailName2}:6030) of an existing ${productName2} cluster node to join${NC}" @@ -629,7 +611,7 @@ function install_taosx() { if [ -f "${script_dir}/taosx/install_taosx.sh" ]; then cd ${script_dir}/taosx chmod a+x install_taosx.sh - bash install_taosx.sh + bash install_taosx.sh -e $serverFqdn fi } @@ -712,30 +694,7 @@ function clean_service_on_systemd() { ${csudo}systemctl stop tarbitratord &>/dev/null || echo &>/dev/null fi ${csudo}systemctl disable tarbitratord &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${tarbitratord_service_config} - - if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then - x_service_config="${service_config_dir}/${xName2}.service" - if [ -e "$x_service_config" ]; then - if systemctl is-active --quiet ${xName2}; then - echo "${productName2} ${xName2} is running, stopping it..." - ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${x_service_config} - fi - - explorer_service_config="${service_config_dir}/${explorerName2}.service" - if [ -e "$explorer_service_config" ]; then - if systemctl is-active --quiet ${explorerName2}; then - echo "${productName2} ${explorerName2} is running, stopping it..." - ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${explorer_service_config} - ${csudo}rm -f /etc/${clientName2}/explorer.toml - fi - fi + ${csudo}rm -f ${tarbitratord_service_config} } function install_service_on_systemd() { @@ -756,15 +715,27 @@ function install_service_on_systemd() { ${csudo}systemctl daemon-reload ${csudo}systemctl enable ${serverName2} - ${csudo}systemctl daemon-reload } function install_adapter_service() { if ((${service_mod} == 0)); then - [ -f ${script_dir}/cfg/${adapterName}.service ] && - ${csudo}cp ${script_dir}/cfg/${adapterName}.service \ + [ -f ${script_dir}/cfg/${adapterName2}.service ] && + ${csudo}cp ${script_dir}/cfg/${adapterName2}.service \ ${service_config_dir}/ || : + + ${csudo}systemctl enable ${adapterName2} + ${csudo}systemctl daemon-reload + fi +} + +function install_keeper_service() { + if ((${service_mod} == 0)); then + [ -f ${script_dir}/cfg/${clientName2}keeper.service ] && + ${csudo}cp ${script_dir}/cfg/${clientName2}keeper.service \ + ${service_config_dir}/ || : + + ${csudo}systemctl enable ${clientName2}keeper ${csudo}systemctl daemon-reload fi } @@ -901,6 +872,7 @@ function updateProduct() { install_log install_header install_lib + install_config if [ "$verMode" == "cluster" ]; then install_connector @@ -912,9 +884,9 @@ function updateProduct() { if [ -z $1 ]; then install_bin install_service - install_adapter_service - install_config + install_adapter_service install_adapter_config + install_keeper_service openresty_work=false @@ -957,19 +929,18 @@ function updateProduct() { # fi echo - echo -e "\033[44;32;1m${productName2} is updated successfully!${NC}" + echo "${productName2} is updated successfully!" echo if [ "$verMode" == "cluster" ];then - echo -e "\033[44;32;1mTo start all the components \t: sudo ./start-all.sh${NC}" + echo -e "\033[44;32;1mTo start all the components : sudo ./start-all.sh${NC}" fi - echo -e "\033[44;32;1mTo access ${productName2} \t\t: ${clientName2} -h $serverFqdn${NC}" + echo -e "\033[44;32;1mTo access ${productName2} : ${clientName2} -h $serverFqdn${NC}" if [ "$verMode" == "cluster" ];then - echo -e "\033[44;32;1mTo access the management system \t: http://$serverFqdn:6060${NC}" - echo -e "\033[44;32;1mTo read the user manual \t: http://$serverFqdn:6060/docs${NC}" + echo -e "\033[44;32;1mTo access the management system : http://$serverFqdn:6060${NC}" + echo -e "\033[44;32;1mTo read the user manual : http://$serverFqdn:6060/docs${NC}" fi else - install_bin - install_config + install_bin echo echo -e "\033[44;32;1m${productName2} client is updated successfully!${NC}" @@ -1001,6 +972,7 @@ function installProduct() { install_jemalloc #install_avro lib #install_avro lib64 + install_config if [ "$verMode" == "cluster" ]; then install_connector @@ -1014,10 +986,10 @@ function installProduct() { install_service install_adapter_service install_adapter_config + install_keeper_service openresty_work=false - install_config # Ask if to start the service echo @@ -1068,20 +1040,20 @@ function installProduct() { # echo # fi echo - echo -e "\033[44;32;1m${productName2} is installed successfully!${NC}" + echo "${productName2} is installed successfully!" echo if [ "$verMode" == "cluster" ];then - echo -e "\033[44;32;1mTo start all the components \t: sudo ./start-all.sh${NC}" + echo -e "\033[44;32;1mTo start all the components : sudo ./start-all.sh${NC}" fi - echo -e "\033[44;32;1mTo access ${productName2} \t\t: ${clientName2} -h $serverFqdn${NC}" + echo -e "\033[44;32;1mTo access ${productName2} : ${clientName2} -h $serverFqdn${NC}" if [ "$verMode" == "cluster" ];then - echo -e "\033[44;32;1mTo access the management system \t: http://$serverFqdn:6060${NC}" - echo -e "\033[44;32;1mTo read the user manual \t: http://$serverFqdn:6060/docs${NC}" + echo -e "\033[44;32;1mTo access the management system : http://$serverFqdn:6060${NC}" + echo -e "\033[44;32;1mTo read the user manual : http://$serverFqdn:6060/docs${NC}" fi echo else # Only install client install_bin - install_config + echo echo -e "\033[44;32;1m${productName2} client is installed successfully!${NC}" fi @@ -1117,4 +1089,6 @@ elif [ "$verType" == "client" ]; then fi else echo "please input correct verType" -fi \ No newline at end of file +fi + + diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index d94e6f566d..1be8e23e56 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -374,6 +374,7 @@ if [ "$verMode" == "cluster" ]; then cp -r ${top_dir}/../enterprise/src/plugins/taosx/release/taosx ${install_dir} cp ${top_dir}/../enterprise/packaging/install_taosx.sh ${install_dir}/taosx cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx + sed -i 's/target=\"\"/target=\"taosx\"/g' ${install_dir}/taosx/uninstall.sh else echo "taox package not found" exit 1 diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 37cc357065..a32788bbd0 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -175,7 +175,7 @@ function clean_log() { function clean_service_on_systemd() { taosd_service_config="${service_config_dir}/${taos_service_name}.service" if systemctl is-active --quiet ${taos_service_name}; then - echo "${productName2} ${serverName2} is running, stopping it..." + echo "${taos_service_name} ${taos_service_name} is running, stopping it..." ${csudo}systemctl stop ${taos_service_name} &>/dev/null || echo &>/dev/null fi ${csudo}systemctl disable ${taos_service_name} &>/dev/null || echo &>/dev/null @@ -196,28 +196,6 @@ function clean_service_on_systemd() { fi ${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null - if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then - x_service_config="${service_config_dir}/${xName2}.service" - if [ -e "$x_service_config" ]; then - if systemctl is-active --quiet ${xName2}; then - echo "${productName2} ${xName2} is running, stopping it..." - ${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${x_service_config} - fi - - explorer_service_config="${service_config_dir}/${explorerName2}.service" - if [ -e "$explorer_service_config" ]; then - if systemctl is-active --quiet ${explorerName2}; then - echo "${productName2} ${explorerName2} is running, stopping it..." - ${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null - fi - ${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null - ${csudo}rm -f ${explorer_service_config} - ${csudo}rm -f /etc/${clientName2}/explorer.toml - fi - fi } function clean_service_on_sysvinit() { @@ -287,10 +265,7 @@ function clean_service() { function uninstall_taosx() { if [ -f /usr/local/taosx/uninstall.sh ]; then cd /usr/local/taosx - bash uninstall.sh > /dev/null - - echo -e "${GREEN}${xName2} is removed successfully!${NC}" - echo -e "${GREEN}${explorerName2} is removed successfully!${NC}" + bash uninstall.sh fi } @@ -332,8 +307,10 @@ if [ "$osType" = "Darwin" ]; then ${csudo}rm -rf /Applications/TDengine.app fi -echo -e "${GREEN}${productName2} is removed successfully!${NC}" if [ "$verMode" == "cluster" ]; then uninstall_taosx fi + +echo +echo "${productName2} is removed successfully!" echo \ No newline at end of file