diff --git a/docs/zh/07-develop/06-stream.md b/docs/zh/07-develop/06-stream.md index 4a5ebb96e8..a6c8b02772 100644 --- a/docs/zh/07-develop/06-stream.md +++ b/docs/zh/07-develop/06-stream.md @@ -8,7 +8,7 @@ title: 流式计算 TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流的能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。 -流式计算可以包含数据过滤,标量函数计算(含UDF),以及窗口聚合(支持滑动窗口、会话窗口与状态窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。 +流式计算可以包含数据过滤,标量函数计算(含UDF),以及窗口聚合(支持滑动窗口、会话窗口、状态窗口、事件窗口与计数窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。 TDengine 的流式计算能够支持分布在多个 vnode 中的超级表聚合;还能够处理乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略——丢弃或者重新计算。 diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 8868b728f8..2ed3c9afae 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -30,7 +30,7 @@ subquery: SELECT select_list [window_clause] ``` -支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用 +支持会话窗口、状态窗口、滑动窗口、事件窗口和计数窗口,其中,状态窗口、事件窗口和计数窗口搭配超级表时必须与partition by tbname一起使用 stb_name 是保存计算结果的超级表的表名,如果该超级表不存在,会自动创建;如果已存在,则检查列的schema信息。详见 写入已存在的超级表 @@ -60,7 +60,11 @@ COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_va 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) -例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 +例如,如下语句创建流式计算。第一个流计算,自动创建名为 avg_vol 的超级表,以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 + +第二个流计算,自动创建名为 streamt0 的超级表,将数据按时间戳的顺序,以 voltage < 0 作为窗口的开始条件,voltage > 9作为窗口的结束条件,划分窗口做聚合运算,并将来自 meters 表的数据的计算结果写入 streamt0 表,不同 partition 的数据会分别创建子表并写入不同子表。 + +第三个流计算,自动创建名为 streamt1 的超级表,将数据按时间戳的顺序,以10条数据为一组,划分窗口做聚合运算,并将来自 meters 表的数据的计算结果写入 streamt1 表,不同 partition 的数据会分别创建子表并写入不同子表。 ```sql CREATE STREAM avg_vol_s INTO avg_vol AS diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c0a2060907..7a5a7d1afc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -268,7 +268,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 1000; +int tsStreamAggCnt = 100000; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47900d540c..ffebd783ac 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -860,6 +860,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); + + ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); } return 0; @@ -1203,8 +1205,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); - qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr, - vgId, pTask->info.taskLevel, req.checkpointId, req.transId); + if (req.mndTrigger) { + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, + vgId, pTask->info.taskLevel, req.checkpointId, req.transId); + } else { + const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask)); + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 + ", transId:%d after transfer-state, prev status:%s", + pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); + } code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 90a26d17dc..d740f9491c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -658,11 +658,12 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int colDataSetNULL(pColInfoData, rowIndex); } else { varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); - if (pColVal->value.nData > pColInfoData->info.bytes) { + if ((pColVal->value.nData + VARSTR_HEADER_SIZE) > pColInfoData->info.bytes) { tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes); return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; } + if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 98e96bb250..5263546765 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -602,6 +602,8 @@ typedef struct SStreamIntervalOperatorInfo { bool recvPullover; SSDataBlock* pMidPulloverRes; bool clearState; + SArray* pMidPullDatas; + int32_t midDelIndex; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 11d8d1487a..ef5c2572d9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -99,6 +99,11 @@ int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) { return -1; } +int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + pAPI->streamStateReleaseBuf(pState, pPos, true); + return TSDB_CODE_SUCCESS; +} + void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { int32_t code = TSDB_CODE_SUCCESS; int32_t size = pAggSup->resultRowSize; @@ -143,6 +148,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI pCurWin->winInfo.isOutput = false; _end: + reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); pNextWinKey->groupId = groupId; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0); @@ -341,6 +347,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (isWindowIncomplete(&curWin)) { + releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore); continue; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 60f577ce12..51071e2b4a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, - SSHashObj* pUpdatedMap) { + SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; @@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { int32_t childId = getChildIndex(pBlock); + if (pInvalidWins) { + qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); + } + SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId); + qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId); getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); continue; } @@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + taosArrayDestroy(pInfo->pMidPullDatas); if (pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); @@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + if (IS_MID_INTERVAL_OP(pOperator)) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; + taosArrayPush(pInfo->pMidPullDatas, &winRes); + } else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { addPullWindow(pMap, &winRes, numOfCh); - qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh); } } } @@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } - if (pInfo->recvPullover) { - pInfo->recvPullover = false; - printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidPulloverRes; - } return NULL; } @@ -1293,7 +1297,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId); @@ -1329,7 +1333,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->recvRetrive = true; copyDataBlock(pInfo->pMidRetriveRes, pBlock); pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL); break; } continue; @@ -1557,8 +1561,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->recvPullover = false; pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->clearState = false; + pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -1837,11 +1843,6 @@ int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { return TSDB_CODE_SUCCESS; } -int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { - pAPI->streamStateReleaseBuf(pState, pPos, true); - return TSDB_CODE_SUCCESS; -} - void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { SSessionKey key = {0}; getSessionHashKey(pKey, &key); @@ -2487,7 +2488,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { return; } SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1); - SSessionKey* pSeKey = pWinInfo->pStatePos->pKey; + SSessionKey* pSeKey = &pWinInfo->sessionWin; taosArrayPush(pMaxWins, pSeKey); if (pSeKey->groupId == 0) { return; @@ -2495,7 +2496,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { uint64_t preGpId = pSeKey->groupId; for (int32_t i = size - 2; i >= 0; i--) { pWinInfo = taosArrayGet(pAllWins, i); - pSeKey = pWinInfo->pStatePos->pKey; + pSeKey = &pWinInfo->sessionWin; if (preGpId != pSeKey->groupId) { taosArrayPush(pMaxWins, pSeKey); preGpId = pSeKey->groupId; @@ -3971,7 +3972,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; @@ -4264,6 +4265,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num } } +static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + + if (pInfo->recvPullover) { + pInfo->recvPullover = false; + printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidPulloverRes; + } + + qDebug("===stream=== build mid interval result"); + doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; + } + + return NULL; +} + static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4292,10 +4321,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { @@ -4335,7 +4363,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap); removeResults(delWins, pInfo->pUpdatedMap); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -4371,7 +4399,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_MID_RETRIEVE) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild); taosArrayDestroy(delWins); pInfo->recvRetrive = true; @@ -4416,10 +4444,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d0055d5400..ff56ae76b0 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -131,7 +131,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTransferStatePrepare(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 607e31bfe6..f58c72eded 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -300,6 +300,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); SStreamTaskState* pStatus = streamTaskGetStatus(p); + ETaskStatus prevStatus = pStatus->state; + if (pStatus->state == TASK_STATUS__CK) { ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && pCKInfo->checkpointVer <= pCKInfo->processedVer); @@ -325,8 +327,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { } stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 - ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s", - vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); + ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s", + vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, + streamTaskGetStatusStr(prevStatus)); // save the task if not sink task if (p->info.taskLevel <= TASK_LEVEL__SINK) { @@ -437,9 +440,11 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { if (type == UPLOAD_DISABLE) { return 0; } + if (pTask == NULL || pTask->pBackend == NULL) { return 0; } + SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId); @@ -448,16 +453,19 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); } -int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; - const char* id = pTask->id.idStr; - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - // sink task do not need to save the status, and generated the checkpoint +int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; + const char* id = pTask->id.idStr; + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + SStreamMeta* pMeta = pTask->pMeta; + + // sink task does not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); + code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); @@ -500,10 +508,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1); } else { stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId); } + taosThreadMutexUnlock(&pTask->lock); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index dc790b5b2d..0af664f1e1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1094,10 +1094,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); + stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cb51d90fba..24d1bf7603 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,7 +21,7 @@ #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms -static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); +static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState* pState = streamTaskGetStatus(pTask); @@ -316,7 +316,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr; @@ -340,9 +340,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.; stDebug( - "s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer " - "exec state", - id, el, pStreamTask->id.idStr); + "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s " + "info, prepare transfer exec state", + id, streamTaskGetStatus(pTask)->name, el, pStreamTask->id.idStr); } ETaskStatus status = streamTaskGetStatus(pStreamTask)->state; @@ -366,9 +366,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } } - // wait for the stream task to handle all in the inputQ, and to be idle - waitForTaskIdle(pTask, pStreamTask); - // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. @@ -394,17 +391,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } - // 2. transfer the ownership of executor state - streamTaskReleaseState(pTask); - streamTaskReloadState(pStreamTask); - - // 3. send msg to mnode to launch a checkpoint to keep the state for current stream + // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec + // 2. send msg to mnode to launch a checkpoint to keep the state for current stream streamTaskSendCheckpointReq(pStreamTask); - // 4. assign the status to the value that will be kept in disk + // 3. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; - // 5. open the inputQ for all upstream tasks + // 4. open the inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); @@ -417,7 +411,7 @@ static int32_t haltCallback(SStreamTask* pTask, void* param) { return TSDB_CODE_SUCCESS; } -int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStatePrepare(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; SStreamMeta* pMeta = pTask->pMeta; @@ -425,7 +419,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. - code = streamDoTransferStateToStreamTask(pTask); + code = streamTransferStateDoPrepare(pTask); } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); @@ -541,7 +535,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); } @@ -622,10 +616,31 @@ int32_t doStreamExecTask(SStreamTask* pTask) { } } - int64_t st = taosGetTimestampMs(); + if (type == STREAM_INPUT__CHECKPOINT) { + // transfer the state from fill-history to related stream task before generating the checkpoint. + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - const SStreamQueueItem* pItem = pInput; - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + // 2. transfer the ownership of executor state + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } + } + + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type)); int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 723f04c499..295132a4f5 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -310,7 +310,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream int32_t size = taosArrayGetSize(pWinStates); if (pCur->buffIndex >= 0) { if (pCur->buffIndex >= size) { - pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size); + pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey); goto _end; } pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex); @@ -327,12 +327,12 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } } pNewPos = getNewRowPosForWrite(pFileState); + memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; } _end: - memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); (*ppVal) = pNewPos; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 45a9a68d3d..4ca784a32f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -456,12 +456,49 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } +static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + SDataRange* pRange = &pTask->dataRange; + + // only set the version info for stream tasks without fill-history task + if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { + pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint + pChkInfo->processedVer = ver - 1; // already processed version + pChkInfo->nextProcessVer = ver; // next processed version + + pRange->range.maxVer = ver; + pRange->range.minVer = ver; + } else { + // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task + // is set at the mnode. + if (pTask->info.fillHistory == 1) { + pChkInfo->checkpointVer = pRange->range.maxVer; + pChkInfo->processedVer = pRange->range.maxVer; + pChkInfo->nextProcessVer = pRange->range.maxVer + 1; + } else { + pChkInfo->checkpointVer = pRange->range.minVer - 1; + pChkInfo->processedVer = pRange->range.minVer - 1; + pChkInfo->nextProcessVer = pRange->range.minVer; + + { // for compatible purpose, remove it later + if (pRange->range.minVer == 0) { + pChkInfo->checkpointVer = 0; + pChkInfo->processedVer = 0; + pChkInfo->nextProcessVer = 1; + stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); + } + } + } + } +} + int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; + pTask->inputq.queue = streamQueueOpen(512 << 10); pTask->outputq.queue = streamQueueOpen(512 << 10); if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { @@ -479,41 +516,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->execInfo.created = taosGetTimestampMs(); - SCheckpointInfo* pChkInfo = &pTask->chkInfo; - SDataRange* pRange = &pTask->dataRange; - - // only set the version info for stream tasks without fill-history task - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { - pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint - pChkInfo->processedVer = ver - 1; // already processed version - pChkInfo->nextProcessVer = ver; // next processed version - - pRange->range.maxVer = ver; - pRange->range.minVer = ver; - } else { - // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task - // is set at the mnode. - if (pTask->info.fillHistory == 1) { - pChkInfo->checkpointVer = pRange->range.maxVer; - pChkInfo->processedVer = pRange->range.maxVer; - pChkInfo->nextProcessVer = pRange->range.maxVer + 1; - } else { - pChkInfo->checkpointVer = pRange->range.minVer - 1; - pChkInfo->processedVer = pRange->range.minVer - 1; - pChkInfo->nextProcessVer = pRange->range.minVer; - - { // for compatible purpose, remove it later - if (pRange->range.minVer == 0) { - pChkInfo->checkpointVer = 0; - pChkInfo->processedVer = 0; - pChkInfo->nextProcessVer = 1; - stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); - } - } - } - } - } + setInitialVersionInfo(pTask, ver); pTask->pMeta = pMeta; pTask->pMsgCb = pMsgCb; @@ -779,8 +782,10 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); if (resetRelHalt) { + stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s", + sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus), + streamTaskGetStatus(*ppStreamTask)->name); (*ppStreamTask)->status.taskStatus = TASK_STATUS__READY; - stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId); } streamMetaSaveTask(pMeta, *ppStreamTask); diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index be89d8e235..9a05e645c2 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -274,7 +274,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0); $loop_count = 0 -loop3: +loop4: $loop_count = $loop_count + 1 if $loop_count == 20 then @@ -291,7 +291,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 2 select * from streamt5; @@ -302,7 +302,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 3 select * from streamt3; diff --git a/tools/shell/inc/shellAuto.h b/tools/shell/inc/shellAuto.h index 6a317fe5c9..bcf500fefc 100644 --- a/tools/shell/inc/shellAuto.h +++ b/tools/shell/inc/shellAuto.h @@ -39,7 +39,10 @@ void shellAutoExit(); void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb); // introduction -void printfIntroduction(); +void printfIntroduction(bool community); + +// show enterprise AD at start or end +void showAD(bool end); // show all commands help void showHelp(); diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 847bbcf4be..f39fbfbdf5 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -400,27 +400,41 @@ SMatch* lastMatch = NULL; // save last match result int cntDel = 0; // delete byte count after next press tab // show auto tab introduction -void printfIntroduction() { - printf(" ******************************** Tab Completion ************************************\n"); +void printfIntroduction(bool community) { + printf(" ********************************* Tab Completion *************************************\n"); char secondLine[160] = "\0"; sprintf(secondLine, " * The %s CLI supports tab completion for a variety of items, ", shell.info.cusName); printf("%s", secondLine); int secondLineLen = strlen(secondLine); - while (87 - (secondLineLen++) > 0) { + while (89 - (secondLineLen++) > 0) { printf(" "); } printf("*\n"); - printf(" * including database names, table names, function names and keywords. *\n"); - printf(" * The full list of shortcut keys is as follows: *\n"); - printf(" * [ TAB ] ...... complete the current word *\n"); - printf(" * ...... if used on a blank line, display all supported commands *\n"); - printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n"); - printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n"); - printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n"); - printf(" * [ Ctrl + L ] ...... clear the entire screen *\n"); - printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); - printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); - printf(" **************************************************************************************\n\n"); + printf(" * including database names, table names, function names and keywords. *\n"); + printf(" * The full list of shortcut keys is as follows: *\n"); + printf(" * [ TAB ] ...... complete the current word *\n"); + printf(" * ...... if used on a blank line, display all supported commands *\n"); + printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n"); + printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n"); + printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n"); + printf(" * [ Ctrl + L ] ...... clear the entire screen *\n"); + printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); + printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); + if(community) { + printf(" * ------------------------------------------------------------------------------------ *\n"); + printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n"); + printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n"); + printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n"); + } + printf(" ****************************************************************************************\n\n"); +} + +// show enterprise AD +void showAD(bool end) { + printf(" You are using TDengine OSS. To experience advanced features, like backup/restore, \n"); + printf(" privilege control and more, or receive 7x24 technical support, try TDengine Enterprise \n"); + printf(" or Free Cloud Trial. Learn more at https://tdengine.com \n"); + printf(" \n"); } void showHelp() { diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 995d3d04ec..5c71396ea9 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -56,7 +56,7 @@ static void shellWriteHistory(); static void shellPrintError(TAOS_RES *tres, int64_t st); static bool shellIsCommentLine(char *line); static void shellSourceFile(const char *file); -static void shellGetGrantInfo(); +static bool shellGetGrantInfo(); static void shellCleanup(void *arg); static void *shellCancelHandler(void *arg); @@ -1150,7 +1150,8 @@ void shellSourceFile(const char *file) { taosCloseFile(&pFile); } -void shellGetGrantInfo() { +bool shellGetGrantInfo(char* buf) { + bool community = true; char sinfo[1024] = {0}; tstrncpy(sinfo, taos_get_server_info(shell.conn), sizeof(sinfo)); strtok(sinfo, "\r\n"); @@ -1165,7 +1166,7 @@ void shellGetGrantInfo() { code != TSDB_CODE_PAR_PERMISSION_DENIED) { fprintf(stderr, "Failed to check Server Edition, Reason:0x%04x:%s\r\n\r\n", code, taos_errstr(tres)); } - return; + return community; } int32_t num_fields = taos_field_count(tres); @@ -1194,11 +1195,13 @@ void shellGetGrantInfo() { memcpy(expired, row[2], fields[2].bytes); if (strcmp(serverVersion, "community") == 0) { - fprintf(stdout, "Server is Community Edition.\r\n"); + community = true; } else if (strcmp(expiretime, "unlimited") == 0) { - fprintf(stdout, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); + community = false; + sprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); } else { - fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, + community = false; + sprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, expiretime); } @@ -1206,6 +1209,7 @@ void shellGetGrantInfo() { } fprintf(stdout, "\r\n"); + return community; } #ifdef WINDOWS @@ -1364,10 +1368,22 @@ int32_t shellExecute() { #ifdef WEBSOCKET if (!shell.args.restful && !shell.args.cloud) { #endif +char buf[512] = ""; +bool community = shellGetGrantInfo(buf); #ifndef WINDOWS - printfIntroduction(); + printfIntroduction(community); +#else +#ifndef WEBSOCKET + if(community) { + showAD(false); + } +#endif #endif - shellGetGrantInfo(); +// printf version +if(!community) { + printf("%s\n", buf); +} + #ifdef WEBSOCKET } #endif @@ -1380,6 +1396,13 @@ int32_t shellExecute() { break; } } +#ifndef WEBSOCKET + // commnuity + if (community) { + showAD(true); + } +#endif + taosThreadJoin(spid, NULL); shellCleanupHistory();