Merge branch '3.0' into fix/TS-4421-3.0

This commit is contained in:
kailixu 2024-03-15 13:17:32 +08:00
commit a5f06114fd
18 changed files with 253 additions and 134 deletions

View File

@ -8,7 +8,7 @@ title: 流式计算
TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流的能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。
流式计算可以包含数据过滤标量函数计算含UDF以及窗口聚合支持滑动窗口、会话窗口与状态窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition不同的 partition 将写入到目的超级表的不同子表。
流式计算可以包含数据过滤标量函数计算含UDF以及窗口聚合支持滑动窗口、会话窗口、状态窗口、事件窗口与计数窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition不同的 partition 将写入到目的超级表的不同子表。
TDengine 的流式计算能够支持分布在多个 vnode 中的超级表聚合;还能够处理乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略——丢弃或者重新计算。

View File

@ -30,7 +30,7 @@ subquery: SELECT select_list
[window_clause]
```
支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用
支持会话窗口、状态窗口、滑动窗口、事件窗口和计数窗口,其中,状态窗口、事件窗口和计数窗口搭配超级表时必须与partition by tbname一起使用
stb_name 是保存计算结果的超级表的表名如果该超级表不存在会自动创建如果已存在则检查列的schema信息。详见 写入已存在的超级表
@ -60,7 +60,11 @@ COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_va
窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
例如,如下语句创建流式计算。第一个流计算,自动创建名为 avg_vol 的超级表以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
第二个流计算,自动创建名为 streamt0 的超级表,将数据按时间戳的顺序,以 voltage < 0 作为窗口的开始条件voltage > 9作为窗口的结束条件划分窗口做聚合运算并将来自 meters 表的数据的计算结果写入 streamt0 表,不同 partition 的数据会分别创建子表并写入不同子表。
第三个流计算,自动创建名为 streamt1 的超级表将数据按时间戳的顺序以10条数据为一组划分窗口做聚合运算并将来自 meters 表的数据的计算结果写入 streamt1 表,不同 partition 的数据会分别创建子表并写入不同子表。
```sql
CREATE STREAM avg_vol_s INTO avg_vol AS

View File

@ -268,7 +268,7 @@ bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
bool tsFilterScalarMode = false;
int tsResolveFQDNRetryTime = 100; // seconds
int tsStreamAggCnt = 1000;
int tsStreamAggCnt = 100000;
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";

View File

@ -860,6 +860,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer);
}
return 0;
@ -1203,8 +1205,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr,
vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
if (req.mndTrigger) {
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr,
vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
} else {
const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
", transId:%d after transfer-state, prev status:%s",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
}
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -658,11 +658,12 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int
colDataSetNULL(pColInfoData, rowIndex);
} else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
if (pColVal->value.nData > pColInfoData->info.bytes) {
if ((pColVal->value.nData + VARSTR_HEADER_SIZE) > pColInfoData->info.bytes) {
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData,
pColInfoData->info.bytes);
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
}
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
}

View File

@ -602,6 +602,8 @@ typedef struct SStreamIntervalOperatorInfo {
bool recvPullover;
SSDataBlock* pMidPulloverRes;
bool clearState;
SArray* pMidPullDatas;
int32_t midDelIndex;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {

View File

@ -99,6 +99,11 @@ int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) {
return -1;
}
int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pPos, true);
return TSDB_CODE_SUCCESS;
}
void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t size = pAggSup->resultRowSize;
@ -143,6 +148,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
pCurWin->winInfo.isOutput = false;
_end:
reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore);
pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
pNextWinKey->groupId = groupId;
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0);
@ -341,6 +347,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
if (isWindowIncomplete(&curWin)) {
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore);
continue;
}

View File

@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SSHashObj* pUpdatedMap) {
SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (chIds) {
int32_t childId = getChildIndex(pBlock);
if (pInvalidWins) {
qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
}
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId);
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId);
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
continue;
}
@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
blockDataDestroy(pInfo->pMidRetriveRes);
blockDataDestroy(pInfo->pMidPulloverRes);
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
taosArrayDestroy(pInfo->pMidPullDatas);
if (pInfo->pState->dump == 1) {
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
.calWin.skey = nextWin.skey,
.calWin.ekey = nextWin.skey};
// add pull data request
if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
if (IS_MID_INTERVAL_OP(pOperator)) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
taosArrayPush(pInfo->pMidPullDatas, &winRes);
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pMap, &winRes, numOfCh);
qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh);
}
}
}
@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
return NULL;
}
@ -1293,7 +1297,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
if (IS_FINAL_INTERVAL_OP(pOperator)) {
int32_t chId = getChildIndex(pBlock);
addRetriveWindow(delWins, pInfo, chId);
@ -1329,7 +1333,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->recvRetrive = true;
copyDataBlock(pInfo->pMidRetriveRes, pBlock);
pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE;
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL);
break;
}
continue;
@ -1557,8 +1561,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false;
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->recvPullover = false;
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->clearState = false;
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -1837,11 +1843,6 @@ int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
return TSDB_CODE_SUCCESS;
}
int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
pAPI->streamStateReleaseBuf(pState, pPos, true);
return TSDB_CODE_SUCCESS;
}
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
SSessionKey key = {0};
getSessionHashKey(pKey, &key);
@ -2487,7 +2488,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
return;
}
SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1);
SSessionKey* pSeKey = pWinInfo->pStatePos->pKey;
SSessionKey* pSeKey = &pWinInfo->sessionWin;
taosArrayPush(pMaxWins, pSeKey);
if (pSeKey->groupId == 0) {
return;
@ -2495,7 +2496,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
uint64_t preGpId = pSeKey->groupId;
for (int32_t i = size - 2; i >= 0; i--) {
pWinInfo = taosArrayGet(pAllWins, i);
pSeKey = pWinInfo->pStatePos->pKey;
pSeKey = &pWinInfo->sessionWin;
if (preGpId != pSeKey->groupId) {
taosArrayPush(pMaxWins, pSeKey);
preGpId = pSeKey->groupId;
@ -3971,7 +3972,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
@ -4264,6 +4265,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num
}
}
static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType;
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
qDebug("===stream=== build mid interval result");
doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
}
return NULL;
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4292,10 +4321,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {
@ -4335,7 +4363,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap);
removeResults(delWins, pInfo->pUpdatedMap);
taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins);
@ -4371,7 +4399,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_MID_RETRIEVE) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild);
taosArrayDestroy(delWins);
pInfo->recvRetrive = true;
@ -4416,10 +4444,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {

View File

@ -131,7 +131,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTransferStatePrepare(SStreamTask* pTask);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);

View File

@ -300,6 +300,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
taosThreadMutexLock(&p->lock);
SStreamTaskState* pStatus = streamTaskGetStatus(p);
ETaskStatus prevStatus = pStatus->state;
if (pStatus->state == TASK_STATUS__CK) {
ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId &&
pCKInfo->checkpointVer <= pCKInfo->processedVer);
@ -325,8 +327,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
}
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name);
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer,
streamTaskGetStatusStr(prevStatus));
// save the task if not sink task
if (p->info.taskLevel <= TASK_LEVEL__SINK) {
@ -437,9 +440,11 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
if (type == UPLOAD_DISABLE) {
return 0;
}
if (pTask == NULL || pTask->pBackend == NULL) {
return 0;
}
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type;
arg->taskId = taosStrdup(taskId);
@ -448,16 +453,19 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL);
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
// sink task do not need to save the status, and generated the checkpoint
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
SStreamMeta* pMeta = pTask->pMeta;
// sink task does not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
@ -500,10 +508,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1);
} else {
stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId);
}
taosThreadMutexUnlock(&pTask->lock);
}

View File

@ -1094,10 +1094,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
code = streamTransferStatePrepare(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}

View File

@ -21,7 +21,7 @@
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamTask* pTask) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
@ -316,7 +316,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
}
}
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
@ -340,9 +340,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
stDebug(
"s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
"exec state",
id, el, pStreamTask->id.idStr);
"s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s "
"info, prepare transfer exec state",
id, streamTaskGetStatus(pTask)->name, el, pStreamTask->id.idStr);
}
ETaskStatus status = streamTaskGetStatus(pStreamTask)->state;
@ -366,9 +366,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
}
}
// wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure.
@ -394,17 +391,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
}
// 2. transfer the ownership of executor state
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
// NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec
// 2. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask);
// 4. assign the status to the value that will be kept in disk
// 3. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 5. open the inputQ for all upstream tasks
// 4. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
@ -417,7 +411,7 @@ static int32_t haltCallback(SStreamTask* pTask, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStatePrepare(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamMeta* pMeta = pTask->pMeta;
@ -425,7 +419,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
code = streamTransferStateDoPrepare(pTask);
} else {
// no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
@ -541,7 +535,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
code = streamTransferStatePrepare(pTask);
if (code != TSDB_CODE_SUCCESS) {
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
}
@ -622,10 +616,31 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
}
}
int64_t st = taosGetTimestampMs();
if (type == STREAM_INPUT__CHECKPOINT) {
// transfer the state from fill-history to related stream task before generating the checkpoint.
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
const SStreamQueueItem* pItem = pInput;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask != NULL) {
// 2. transfer the ownership of executor state
streamTaskReleaseState(pHTask);
streamTaskReloadState(pTask);
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
streamTaskGetStatus(pHTask)->name);
streamMetaReleaseTask(pTask->pMeta, pHTask);
} else {
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
(int32_t)pHTaskId->taskId);
}
}
}
int64_t st = taosGetTimestampMs();
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type));
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);

View File

@ -310,7 +310,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
int32_t size = taosArrayGetSize(pWinStates);
if (pCur->buffIndex >= 0) {
if (pCur->buffIndex >= size) {
pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size);
pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey);
goto _end;
}
pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex);
@ -327,12 +327,12 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
}
}
pNewPos = getNewRowPosForWrite(pFileState);
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
pNewPos->needFree = true;
pNewPos->beFlushed = true;
}
_end:
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
(*ppVal) = pNewPos;
return TSDB_CODE_SUCCESS;
}

View File

@ -456,12 +456,49 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x free task completed", taskId);
}
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SDataRange* pRange = &pTask->dataRange;
// only set the version info for stream tasks without fill-history task
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
pChkInfo->processedVer = ver - 1; // already processed version
pChkInfo->nextProcessVer = ver; // next processed version
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
} else {
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
}
}
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->inputq.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
@ -479,41 +516,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
pTask->execInfo.created = taosGetTimestampMs();
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SDataRange* pRange = &pTask->dataRange;
// only set the version info for stream tasks without fill-history task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
pChkInfo->processedVer = ver - 1; // already processed version
pChkInfo->nextProcessVer = ver; // next processed version
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
} else {
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
}
}
}
setInitialVersionInfo(pTask, ver);
pTask->pMeta = pMeta;
pTask->pMsgCb = pMsgCb;
@ -779,8 +782,10 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
if (resetRelHalt) {
stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus),
streamTaskGetStatus(*ppStreamTask)->name);
(*ppStreamTask)->status.taskStatus = TASK_STATUS__READY;
stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId);
}
streamMetaSaveTask(pMeta, *ppStreamTask);

View File

@ -274,7 +274,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0);
$loop_count = 0
loop3:
loop4:
$loop_count = $loop_count + 1
if $loop_count == 20 then
@ -291,7 +291,7 @@ if $rows != 1 then
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
goto loop4
endi
print 2 select * from streamt5;
@ -302,7 +302,7 @@ if $rows != 1 then
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
goto loop4
endi
print 3 select * from streamt3;

View File

@ -39,7 +39,10 @@ void shellAutoExit();
void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb);
// introduction
void printfIntroduction();
void printfIntroduction(bool community);
// show enterprise AD at start or end
void showAD(bool end);
// show all commands help
void showHelp();

View File

@ -400,27 +400,41 @@ SMatch* lastMatch = NULL; // save last match result
int cntDel = 0; // delete byte count after next press tab
// show auto tab introduction
void printfIntroduction() {
printf(" ******************************** Tab Completion ************************************\n");
void printfIntroduction(bool community) {
printf(" ********************************* Tab Completion *************************************\n");
char secondLine[160] = "\0";
sprintf(secondLine, " * The %s CLI supports tab completion for a variety of items, ", shell.info.cusName);
printf("%s", secondLine);
int secondLineLen = strlen(secondLine);
while (87 - (secondLineLen++) > 0) {
while (89 - (secondLineLen++) > 0) {
printf(" ");
}
printf("*\n");
printf(" * including database names, table names, function names and keywords. *\n");
printf(" * The full list of shortcut keys is as follows: *\n");
printf(" * [ TAB ] ...... complete the current word *\n");
printf(" * ...... if used on a blank line, display all supported commands *\n");
printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n");
printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n");
printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n");
printf(" * [ Ctrl + L ] ...... clear the entire screen *\n");
printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n");
printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n");
printf(" **************************************************************************************\n\n");
printf(" * including database names, table names, function names and keywords. *\n");
printf(" * The full list of shortcut keys is as follows: *\n");
printf(" * [ TAB ] ...... complete the current word *\n");
printf(" * ...... if used on a blank line, display all supported commands *\n");
printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n");
printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n");
printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n");
printf(" * [ Ctrl + L ] ...... clear the entire screen *\n");
printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n");
printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n");
if(community) {
printf(" * ------------------------------------------------------------------------------------ *\n");
printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n");
printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n");
printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n");
}
printf(" ****************************************************************************************\n\n");
}
// show enterprise AD
void showAD(bool end) {
printf(" You are using TDengine OSS. To experience advanced features, like backup/restore, \n");
printf(" privilege control and more, or receive 7x24 technical support, try TDengine Enterprise \n");
printf(" or Free Cloud Trial. Learn more at https://tdengine.com \n");
printf(" \n");
}
void showHelp() {

View File

@ -56,7 +56,7 @@ static void shellWriteHistory();
static void shellPrintError(TAOS_RES *tres, int64_t st);
static bool shellIsCommentLine(char *line);
static void shellSourceFile(const char *file);
static void shellGetGrantInfo();
static bool shellGetGrantInfo();
static void shellCleanup(void *arg);
static void *shellCancelHandler(void *arg);
@ -1150,7 +1150,8 @@ void shellSourceFile(const char *file) {
taosCloseFile(&pFile);
}
void shellGetGrantInfo() {
bool shellGetGrantInfo(char* buf) {
bool community = true;
char sinfo[1024] = {0};
tstrncpy(sinfo, taos_get_server_info(shell.conn), sizeof(sinfo));
strtok(sinfo, "\r\n");
@ -1165,7 +1166,7 @@ void shellGetGrantInfo() {
code != TSDB_CODE_PAR_PERMISSION_DENIED) {
fprintf(stderr, "Failed to check Server Edition, Reason:0x%04x:%s\r\n\r\n", code, taos_errstr(tres));
}
return;
return community;
}
int32_t num_fields = taos_field_count(tres);
@ -1194,11 +1195,13 @@ void shellGetGrantInfo() {
memcpy(expired, row[2], fields[2].bytes);
if (strcmp(serverVersion, "community") == 0) {
fprintf(stdout, "Server is Community Edition.\r\n");
community = true;
} else if (strcmp(expiretime, "unlimited") == 0) {
fprintf(stdout, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo);
community = false;
sprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo);
} else {
fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo,
community = false;
sprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo,
expiretime);
}
@ -1206,6 +1209,7 @@ void shellGetGrantInfo() {
}
fprintf(stdout, "\r\n");
return community;
}
#ifdef WINDOWS
@ -1364,10 +1368,22 @@ int32_t shellExecute() {
#ifdef WEBSOCKET
if (!shell.args.restful && !shell.args.cloud) {
#endif
char buf[512] = "";
bool community = shellGetGrantInfo(buf);
#ifndef WINDOWS
printfIntroduction();
printfIntroduction(community);
#else
#ifndef WEBSOCKET
if(community) {
showAD(false);
}
#endif
#endif
shellGetGrantInfo();
// printf version
if(!community) {
printf("%s\n", buf);
}
#ifdef WEBSOCKET
}
#endif
@ -1380,6 +1396,13 @@ int32_t shellExecute() {
break;
}
}
#ifndef WEBSOCKET
// commnuity
if (community) {
showAD(true);
}
#endif
taosThreadJoin(spid, NULL);
shellCleanupHistory();