feat(stream): optimize client logic of creating stream if with history (#30059)

* feat:[TS-5617]use last_row cache mode if creating stream in fill_history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* feat(stream): optimize client logic of creating stream if with history

* fix: heap use after free

* feat: add log

* fix: ci case error

* fix: compile error in windows

* fix: ci case error

* fix: heap user after free

* fix: memory leak

* fix: ci case error

* fix: ci case error

* fix: ci case error

---------

Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
This commit is contained in:
WANG MINGMING 2025-03-14 13:55:52 +08:00 committed by GitHub
parent 13f9bddf3d
commit 3d053e2c9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 561 additions and 86 deletions

View File

@ -108,6 +108,13 @@ Under normal circumstances, stream computation tasks will not process data that
By enabling the fill_history option, the created stream computation task will be capable of processing data written before, during, and after the creation of the stream. This means that data written either before or after the creation of the stream will be included in the scope of stream computation, thus ensuring data integrity and consistency. This setting provides users with greater flexibility, allowing them to flexibly handle historical and new data according to actual needs.
Tips:
- When enabling fill_ristory, creating a stream requires finding the boundary point of historical data. If there is a lot of historical data, it may cause the task of creating a stream to take a long time. In this case, the parameter streamRunHistorySync (supported since version 3.3.6.0) can be configured to 1 (default is 0), and the task of creating a stream can be processed in the background. The statement of creating a stream can be returned immediately without blocking subsequent operations.
- Show streams can be used to view the progress of background stream creation (ready status indicates success, init status indicates stream creation in progress, failed status indicates that the stream creation has failed, and the message column can be used to view the reason for the failure. In the case of failed stream creation, the stream can be deleted and rebuilt).
- Besides, do not create multiple streams asynchronously at the same time, as transaction conflicts may cause subsequent streams to fail.
For example, create a stream to count the number of data entries generated by all smart meters every 10s, and also calculate historical data. SQL as follows:
```sql

View File

@ -72,6 +72,12 @@ The TDengine client driver provides all the APIs needed for application programm
| tempDir | |Supported, effective immediately | Specifies the directory for generating temporary files during operation, default on Linux platform is /tmp |
| minimalTmpDirGB | |Supported, effective immediately | Minimum space required to be reserved in the directory specified by tempDir, in GB, default value: 1 |
### Stream Related
| Parameter Name |Supported Version|Dynamic Modification| Description |
|-----------------------|----------|--------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| streamRunHistoryAsync | 3.3.6.0 |Supported, effective immediately | When creating a stream with the fill_history parameter, should the stream statement be executed asynchronously. Boolean value, async if true, sync if false. default is false |
### Log Related
|Parameter Name|Supported Version|Dynamic Modification|Description|

View File

@ -101,6 +101,13 @@ PARTITION 子句中,为 tbname 定义了一个别名 tname 在 PARTITION
通过启用 fill_history 选项,创建的流计算任务将具备处理创建前、创建过程中以及创建后写入的数据的能力。这意味着,无论数据是在流创建之前还是之后写入的,都将纳入流计算的范围,从而确保数据的完整性和一致性。这一设置为用户提供了更大的灵活性,使其能够根据实际需求灵活处理历史数据和新数据。
注意:
- 开启 fill_history 时,创建流需要找到历史数据的分界点,如果历史数据很多,可能会导致创建流任务耗时较长,此时可以配置参数 streamRunHistoryAsync3.3.6.0版本开始支持) 为 1 默认为0将创建流的任务放在后台处理创建流的语句可立即返回不阻塞后面的操作。
- 通过 show streams 可查看后台建流的进度ready 状态表示成功init 状态表示正在建流failed 状态表示建流失败,失败时 message 列可以查看原因。对于建流失败的情况可以删除流重新建立)。
- 另外,不要同时异步创建多个流,可能由于事务冲突导致后面创建的流失败。
比如,创建一个流,统计所有智能电表每 10s 产生的数据条数并且计算历史数据。SQL 如下:
```sql
create stream if not exists count_history_s fill_history 1 into count_history as select count(*) from power.meters interval(10s)

View File

@ -305,6 +305,15 @@ TDengine 客户端驱动提供了应用编程所需要的全部 API并且在
- 动态修改:不支持
- 支持版本:从 v3.1.0.0 版本开始引入
### 流相关
#### streamRunHistoryAsync
- 说明:创建流有 fill_history 参数时,是否异步执行建流语句
- 类型布尔false同步true异步
- 默认值false
- 动态修改:支持通过 SQL 修改,立即生效
- 支持版本:从 v3.3.6.0 版本开始引入
### 日志相关
#### logDir

View File

@ -291,11 +291,13 @@ extern int32_t tsUptimeInterval;
extern bool tsUpdateCacheBatch;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
extern int64_t tsStreamFailedTimeout;
extern int tsStreamAggCnt;
extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
extern bool tsStreamCoverage;
extern bool tsStreamRunHistoryAsync;
extern int8_t tsS3EpNum;
extern int32_t tsStreamNotifyMessageSize;
extern int32_t tsStreamNotifyFrameSize;

View File

@ -130,6 +130,8 @@
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_ANODE, "update-anode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ANODE, "drop-anode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_ANAL_ALGO, "retrieve-anal-algo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_FAILED_STREAM, "create-stream-failed", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CHECK_STREAM_TIMER, "check-stream-status", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_DND_MSG)
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8

View File

@ -103,6 +103,7 @@ typedef struct SParseContext {
setQueryFn setQueryFp;
timezone_t timezone;
void *charsetCxt;
bool streamRunHistory;
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);

View File

@ -353,6 +353,8 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_RESERVE_VALUE_LEN 256
#define TSDB_CLUSTER_VALUE_LEN 1000
#define TSDB_GRANT_LOG_COL_LEN 15600

View File

@ -301,6 +301,7 @@ typedef struct SRequestObj {
SMetaData parseMeta;
char* effectiveUser;
int8_t source;
bool streamRunHistory;
} SRequestObj;
typedef struct SSyncQueryParam {

View File

@ -1126,13 +1126,13 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock*
}
}
tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
}
(*pBlock)->info.window.ekey = lastTs;
(*pBlock)->info.rows = numOfRows;
tscDebug("lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
return TSDB_CODE_SUCCESS;
}
@ -3116,7 +3116,9 @@ void doRequestCallback(SRequestObj* pRequest, int32_t code) {
code = TSDB_CODE_SUCCESS;
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
}
pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
}
SRequestObj* pReq = acquireRequest(this);
if (pReq != NULL) {
pReq->inCallback = false;

View File

@ -1427,7 +1427,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SS
.parseSqlParam = pWrapper,
.setQueryFp = setQueryRequest,
.timezone = pTscObj->optionInfo.timezone,
.charsetCxt = pTscObj->optionInfo.charsetCxt};
.charsetCxt = pTscObj->optionInfo.charsetCxt,
.streamRunHistory = pRequest->streamRunHistory};
int8_t biMode = atomic_load_8(&((STscObj *)pTscObj)->biMode);
(*pCxt)->biMode = biMode;
return TSDB_CODE_SUCCESS;

View File

@ -858,6 +858,119 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
return code;
}
static int32_t setCreateStreamFailedRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
}
if (code != 0){
tscError("setCreateStreamFailedRsp since %s", tstrerror(code));
} else{
tscInfo("setCreateStreamFailedRsp success");
}
return code;
}
void sendCreateStreamFailedMsg(SRequestObj* pRequest, char* streamName){
int32_t code = 0;
tscInfo("send failed stream name to mgmt: %s", streamName);
int32_t size = INT_BYTES + strlen(streamName);
void *buf = taosMemoryMalloc(size);
if (buf == NULL) {
tscError("failed to strdup stream name: %s", terrstr());
return;
}
*(int32_t*)buf = pRequest->code;
memcpy(POINTER_SHIFT(buf, INT_BYTES), streamName, strlen(streamName));
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(buf);
tscError("failed to calloc msgSendInfo: %s", terrstr());
return;
}
sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = size, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->msgType = TDMT_MND_FAILED_STREAM;
sendInfo->fp = setCreateStreamFailedRsp;
SEpSet epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
code = asyncSendMsgToServer(pRequest->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
tscError("failed to send failed stream name to mgmt since %s", tstrerror(code));
}
}
static void processCreateStreamSecondPhaseRsp(void* param, void* res, int32_t code) {
SRequestObj* pRequest = res;
if (code != 0 && param != NULL){
sendCreateStreamFailedMsg(pRequest, param);
}
taosMemoryFree(param);
destroyRequest(pRequest);
}
static char* getStreamName(SRequestObj* pRequest){
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)(pRequest->pQuery->pRoot);
SName name;
int32_t code = tNameSetDbName(&name, pRequest->pTscObj->acctId, pStmt->streamName, strlen(pStmt->streamName));
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to set db name for stream since %s", tstrerror(code));
return NULL;
} else{
char *streamName = taosMemoryCalloc(1, TSDB_STREAM_FNAME_LEN);
(void)tNameGetFullDbName(&name, streamName);
return streamName;
}
}
void processCreateStreamSecondPhase(SRequestObj* pRequest){
tscInfo("[create stream with histroy] create in second phase");
char *streamName = getStreamName(pRequest);
size_t sqlLen = strlen(pRequest->sqlstr);
SRequestObj* pRequestNew = NULL;
int32_t code = buildRequest(pRequest->pTscObj->id, pRequest->sqlstr, sqlLen, streamName, false, &pRequestNew, 0);
if (code != TSDB_CODE_SUCCESS) {
tscError("[create stream with histroy] create in second phase, build request failed since %s", tstrerror(code));
return;
}
pRequestNew->source = pRequest->source;
pRequestNew->body.queryFp = processCreateStreamSecondPhaseRsp;
pRequestNew->streamRunHistory = true;
doAsyncQuery(pRequestNew, false);
}
int32_t processCreateStreamFirstPhaseRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
tscError("failed to remove meta data for table");
}
}
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if (code == 0 && !pRequest->streamRunHistory && tsStreamRunHistoryAsync){
processCreateStreamSecondPhase(pRequest);
}
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
} else {
if (tsem_post(&pRequest->body.rspSem) != 0) {
tscError("failed to post semaphore");
}
}
return code;
}
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
switch (msgType) {
case TDMT_MND_CONNECT:
@ -874,9 +987,12 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
return processAlterStbRsp;
case TDMT_MND_SHOW_VARIABLES:
return processShowVariablesRsp;
case TDMT_MND_CREATE_STREAM:
return processCreateStreamFirstPhaseRsp;
case TDMT_MND_COMPACT_DB:
return processCompactDbRsp;
default:
return genericRspCallback;
}
}

View File

@ -182,6 +182,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "message", .bytes = TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema streamTaskSchema[] = {

View File

@ -98,7 +98,7 @@ static int32_t getDataLen(int32_t type, const char* pData) {
}
static int32_t colDataSetValHelp(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
if (isNull || pData == NULL) {
if (isNull || pData == NULL) {
// There is a placehold for each NULL value of binary or nchar type.
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
pColumnInfoData->varmeta.offset[rowIndex] = -1; // it is a null value of VAR type.

View File

@ -347,9 +347,11 @@ bool tsDisableStream = false;
bool tsDisableStream = true;
#endif
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsStreamFailedTimeout = 30 * 60 * 1000;
bool tsFilterScalarMode = false;
int tsStreamAggCnt = 100000;
bool tsStreamCoverage = false;
bool tsStreamRunHistoryAsync = false;
bool tsUpdateCacheBatch = true;
@ -774,6 +776,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "compareAsStrInGreatest", tsCompareAsStrInGreatest, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "streamRunHistoryAsync", tsStreamRunHistoryAsync, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -964,6 +969,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "streamFailedTimeout", tsStreamFailedTimeout, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1800, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL));
@ -1507,6 +1513,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamCoverage");
tsStreamCoverage = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamRunHistoryAsync");
tsStreamRunHistoryAsync = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "compareAsStrInGreatest");
tsCompareAsStrInGreatest = pItem->bval;
@ -1837,6 +1846,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamBufferSize");
tsStreamBufferSize = pItem->i64;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamFailedTimeout");
tsStreamFailedTimeout = pItem->i64;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamAggCnt");
tsStreamAggCnt = pItem->i32;
@ -2825,6 +2837,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
{"numOfRpcSessions", &tsNumOfRpcSessions},
{"bypassFlag", &tsBypassFlag},
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"streamRunHistoryAsync", &tsStreamRunHistoryAsync},
{"streamCoverage", &tsStreamCoverage},
{"compareAsStrInGreatest", &tsCompareAsStrInGreatest}};

View File

@ -174,6 +174,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_FAILED_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -833,7 +833,7 @@ typedef struct {
int32_t indexForMultiAggBalance;
int8_t subTableWithoutMd5;
char reserve[256];
char reserve[TSDB_RESERVE_VALUE_LEN];
} SStreamObj;

View File

@ -199,6 +199,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
}
void *freeStreamTasks(SArray *pTaskLevel) {
if (pTaskLevel == NULL) return NULL;
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {

View File

@ -204,6 +204,18 @@ static void mndStreamCheckNode(SMnode *pMnode) {
}
}
static void mndStreamCheckStatus(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_CHECK_STREAM_TIMER, .pCont = pReq, .contLen = contLen};
// TODO check return value
if (tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg) < 0) {
mError("failed to put into write-queue since %s, line:%d", terrstr(), __LINE__);
}
}
}
static void mndStreamConsensusChkpt(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
@ -410,6 +422,10 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndStreamCheckNode(pMnode);
}
if (sec % (tsStreamFailedTimeout/1000) == 0) {
mndStreamCheckStatus(pMnode);
}
if (sec % 5 == 0) {
mndStreamConsensusChkpt(pMnode);
}
@ -906,7 +922,7 @@ _OVER:
pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT ||
pMsg->msgType == TDMT_MND_S3MIGRATE_DB_TIMER || pMsg->msgType == TDMT_MND_ARB_HEARTBEAT_TIMER ||
pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER) {
pMsg->msgType == TDMT_MND_ARB_CHECK_SYNC_TIMER || pMsg->msgType == TDMT_MND_CHECK_STREAM_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state));
TAOS_RETURN(code);

View File

@ -40,6 +40,8 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq);
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq);
@ -95,6 +97,8 @@ int32_t mndInitStream(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_FAILED_STREAM, mndProcessFailedStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_CHECK_STREAM_TIMER, mndProcessCheckStreamStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
@ -229,7 +233,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
}
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
mTrace("stream:%s, perform delete action", pStream->name);
mInfo("stream:%s, perform delete action", pStream->name);
taosWLockLatch(&pStream->lock);
tFreeStreamObj(pStream);
taosWUnLockLatch(&pStream->lock);
@ -246,7 +250,14 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
pOldStream->updateTime = pNewStream->updateTime;
pOldStream->checkpointId = pNewStream->checkpointId;
pOldStream->checkpointFreq = pNewStream->checkpointFreq;
if (pOldStream->tasks == NULL){
pOldStream->tasks = pNewStream->tasks;
pNewStream->tasks = NULL;
}
if (pOldStream->pHTasksList == NULL){
pOldStream->pHTasksList = pNewStream->pHTasksList;
pNewStream->pHTasksList = NULL;
}
taosWUnLockLatch(&pOldStream->lock);
return 0;
}
@ -346,7 +357,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
pObj->status = STREAM_STATUS__NORMAL;
pObj->conf.igExpired = pCreate->igExpired;
pObj->conf.trigger = pCreate->triggerType;
@ -732,11 +743,11 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
pStreamObj->name);
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_TOO_MANY_STREAMS;
}
@ -744,9 +755,11 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
pStreamObj->name);
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
}
sdbRelease(pMnode->pSdb, pStream);
}
return TSDB_CODE_SUCCESS;
@ -823,6 +836,58 @@ _end:
return code;
}
static int32_t mndProcessCheckStreamStatusReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
taosWLockLatch(&pStream->lock);
if (pStream->status == STREAM_STATUS__INIT && (taosGetTimestampMs() - pStream->createTime > tsStreamFailedTimeout ||
taosGetTimestampMs() - pStream->createTime < 0)){
pStream->status = STREAM_STATUS__FAILED;
tstrncpy(pStream->reserve, "timeout", sizeof(pStream->reserve));
mInfo("stream:%s, set status to failed success because of timeout", pStream->name);
}
taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream);
}
return 0;
}
static int32_t mndProcessFailedStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t errCode = *(int32_t*)pReq->pCont;
char streamName[TSDB_STREAM_FNAME_LEN] = {0};
memcpy(streamName, POINTER_SHIFT(pReq->pCont,INT_BYTES), TMIN(pReq->contLen - INT_BYTES, TSDB_STREAM_FNAME_LEN - 1));
#ifdef WINDOWS
code = TSDB_CODE_MND_INVALID_PLATFORM;
return code;
#endif
mInfo("stream:%s, start to set stream failed", streamName);
code = mndAcquireStream(pMnode, streamName, &pStream);
if (pStream == NULL) {
mError("stream:%s, failed to get stream when failed stream since %s", streamName, tstrerror(code));
return code;
}
taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__FAILED;
tstrncpy(pStream->reserve, tstrerror(errCode), sizeof(pStream->reserve));
taosWUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
mInfo("stream:%s, end to set stream failed success", streamName);
return code;
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
@ -851,14 +916,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
code = mndAcquireStream(pMnode, createReq.name, &pStream);
if (pStream != NULL && code == 0) {
if (createReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createReq);
return code;
} else {
code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
if (pStream->tasks != NULL){
if (createReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createReq);
return code;
} else {
code = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
}
}
} else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) {
goto _OVER;
@ -900,88 +967,82 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
bool buildEmptyStream = false;
if (createReq.lastTs == 0 && createReq.fillHistory != STREAM_FILL_HISTORY_OFF){
streamObj.status = STREAM_STATUS__INIT;
buildEmptyStream = true;
}
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
goto _OVER;
}
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
goto _OVER;
}
code = doStreamCheck(pMnode, &streamObj);
TSDB_CHECK_CODE(code, lino, _OVER);
// schedule stream task for stream obj
if (!buildEmptyStream) {
code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
goto _OVER;
}
// add notify info into all stream tasks
code = addStreamNotifyInfo(&createReq, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
goto _OVER;
}
// add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
streamMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
streamMutexUnlock(&execInfo.lock);
}
code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
if (pTrans == NULL || code) {
goto _OVER;
}
// create stb for stream
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE) {
if (createReq.createStb == STREAM_CREATE_STABLE_TRUE && !buildEmptyStream) {
if ((code = mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user)) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createReq.name, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}
} else {
mDebug("stream:%s no need create stable", createReq.name);
}
// schedule stream task for stream obj
code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to schedule since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}
// add notify info into all stream tasks
code = addStreamNotifyInfo(&createReq, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}
// add stream to trans
code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to persist since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb)) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb)) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
// add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
streamMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
streamMutexUnlock(&execInfo.lock);
// execute creation
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}
mndTransDrop(pTrans);
SName dbname = {0};
code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (code) {
if (tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) != 0) {
mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
goto _OVER;
}
SName name = {0};
code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
if (code) {
if (tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE) != 0) {
mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
goto _OVER;
}
// reuse this function for stream
@ -1001,6 +1062,7 @@ _OVER:
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
mndTransDrop(pTrans);
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createReq);
tFreeStreamObj(&streamObj);
@ -2352,6 +2414,23 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static int32_t mndProcessStatusCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
int32_t size = sizeof(SMStreamNodeCheckMsg);
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
if (pMsg == NULL) {
return terrno;
}
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);

View File

@ -995,6 +995,8 @@ static void mndShowStreamStatus(char *dst, int8_t status) {
tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__PAUSE) {
tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__INIT) {
tstrncpy(dst, "init", MND_STREAM_TRIGGER_NAME_SIZE);
}
}
@ -1108,7 +1110,7 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
TSDB_CHECK_CODE(code, lino, _end);
int8_t streamStatus = atomic_load_8(&pStream->status);
if (isPaused) {
if (isPaused && pStream->tasks != NULL) {
streamStatus = STREAM_STATUS__PAUSE;
}
mndShowStreamStatus(status2, streamStatus);
@ -1207,6 +1209,17 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
if (streamStatus == STREAM_STATUS__FAILED){
STR_TO_VARSTR(msg, pStream->reserve)
} else {
STR_TO_VARSTR(msg, " ")
}
code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
_end:
if (code) {

View File

@ -275,7 +275,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg);

View File

@ -366,7 +366,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
return 0;
}
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessPollPush(STQ* pTq) {
if (pTq == NULL) {
return TSDB_CODE_INVALID_PARA;
}

View File

@ -888,7 +888,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_CONSUME_PUSH:
return tqProcessPollPush(pVnode->pTq, pMsg);
return tqProcessPollPush(pVnode->pTq);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -76,7 +76,7 @@ typedef struct SSysTableShowAdapter {
const char* pDbName;
const char* pTableName;
int32_t numOfShowCols;
const char* pShowCols[2];
const char* pShowCols[3];
} SSysTableShowAdapter;
typedef struct SCollectJoinCondsContext {
@ -187,8 +187,8 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
.showType = QUERY_NODE_SHOW_STREAMS_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_STREAMS,
.numOfShowCols = 1,
.pShowCols = {"stream_name"}
.numOfShowCols = 3,
.pShowCols = {"stream_name","status","message"}
},
{
.showType = QUERY_NODE_SHOW_TABLES_STMT,
@ -12634,6 +12634,7 @@ static int32_t createLastTsSelectStmt(char* pDb, const char* pTable, const char*
return code;
}
parserInfo("[create stream with histroy] create select last ts query, db:%s, table:%s", pDb, pTable);
tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias));
tstrncpy(col->colName, pkColName, tListLen(col->colName));
SNodeList* pParameterList = NULL;
@ -12838,7 +12839,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory) {
if (TSDB_CODE_SUCCESS == code && pStmt->pOptions->fillHistory && (pCxt->pParseCxt->streamRunHistory || !tsStreamRunHistoryAsync)) {
SRealTableNode* pTable = (SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable);
code = createLastTsSelectStmt(pTable->table.dbName, pTable->table.tableName, pTable->pMeta->schema[0].name,
&pStmt->pPrevQuery);
@ -13060,6 +13061,7 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDa
code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq);
}
qInfo("[create stream with histroy] post create stream, lastTs:%" PRId64, pStmt->pReq->lastTs);
if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&cxt, pQuery);
}

View File

@ -121,8 +121,10 @@ void generatePerformanceSchema(MockCatalogService* mcs) {
.addColumn("id", TSDB_DATA_TYPE_INT)
.addColumn("create_time", TSDB_DATA_TYPE_TIMESTAMP)
.done();
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_STREAMS, TSDB_SYSTEM_TABLE, 2)
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_STREAMS, TSDB_SYSTEM_TABLE, 4)
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("status", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("message", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
.addColumn("create_time", TSDB_DATA_TYPE_TIMESTAMP)
.done();
mcs->createTableBuilder(TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONSUMERS, TSDB_SYSTEM_TABLE, 2)

View File

@ -326,8 +326,6 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
streamTmrStop(pInfo->checkRspTmr);
pInfo->checkRspTmr = NULL;
}
streamMutexDestroy(&pInfo->checkInfoLock);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -151,10 +151,6 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
if (code) {
return code;
}
if (fillHistory && !hasFillhistory) {
stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
@ -226,13 +222,13 @@ void tFreeStreamTask(void* pParam) {
STaskExecStatisInfo* pStatis = &pTask->execInfo;
ETaskStatus status1 = TASK_STATUS__UNINIT;
streamMutexLock(&pTask->lock);
if (pTask->status.pSM != NULL) {
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
p = status.name;
status1 = status.state;
streamMutexUnlock(&pTask->lock);
}
streamMutexUnlock(&pTask->lock);
stDebug("start to free s-task:0x%x %p, state:%s, refId:%" PRId64, taskId, pTask, p, pTask->id.refId);
@ -304,17 +300,21 @@ void tFreeStreamTask(void* pParam) {
tSimpleHashCleanup(pTask->pNameMap);
}
if (pTask->status.pSM != NULL) {
streamMutexDestroy(&pTask->lock);
streamMutexDestroy(&pTask->msgInfo.lock);
streamMutexDestroy(&pTask->taskCheckInfo.checkInfoLock);
}
streamDestroyStateMachine(pTask->status.pSM);
pTask->status.pSM = NULL;
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
taosMemoryFree(pTask->outputInfo.pTokenBucket);
streamMutexDestroy(&pTask->lock);
taosArrayDestroy(pTask->msgInfo.pSendInfo);
pTask->msgInfo.pSendInfo = NULL;
streamMutexDestroy(&pTask->msgInfo.lock);
taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
pTask->outputInfo.pNodeEpsetUpdateList = NULL;
@ -529,6 +529,11 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return terrno;
}
code = taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
if (code) {
return code;
}
if (pTask->chkInfo.pActiveInfo == NULL) {
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
if (code) {

View File

@ -216,6 +216,7 @@
#
# system test
#
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/ts-5617.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_multi_agg.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py

View File

@ -230,7 +230,7 @@ endi
sql_error show create stable t0;
sql show variables;
if $rows != 88 then
if $rows != 89 then
return -1
endi

View File

@ -120,7 +120,7 @@ if $rows != 3 then
endi
sql show variables;
if $rows != 88 then
if $rows != 89 then
return -1
endi

View File

@ -222,7 +222,7 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(320, 321))
tdSql.checkEqual(len(tdSql.queryResult), 321)
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(61, len(tdSql.queryResult))

View File

@ -0,0 +1,187 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
insertJson = '''{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "localhost",
"port": 6030,
"user": "root",
"password": "taosdata",
"connection_pool_size": 10,
"thread_count": 10,
"create_table_thread_count": 10,
"result_file": "./insert-2-2-1.txt",
"confirm_parameter_prompt": "no",
"num_of_records_per_req": 3600,
"prepared_rand": 3600,
"chinese": "no",
"escape_character": "yes",
"continue_if_fail": "no",
"databases": [
{
"dbinfo": {
"name": "ts5617",
"drop": "yes",
"vgroups": 10,
"precision": "ms",
"buffer": 512,
"cachemodel":"'both'",
"stt_trigger": 1
},
"super_tables": [
{
"name": "stb_2_2_1",
"child_table_exists": "no",
"childtable_count": 10000,
"childtable_prefix": "d_",
"auto_create_table": "yes",
"batch_create_tbl_num": 10,
"data_source": "csv",
"insert_mode": "stmt",
"non_stop_mode": "no",
"line_protocol": "line",
"insert_rows": 10000,
"childtable_limit": 0,
"childtable_offset": 0,
"interlace_rows": 0,
"insert_interval": 0,
"partial_col_num": 0,
"timestamp_step": 1000,
"start_timestamp": "2024-11-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./td_double10000_juchi.csv",
"use_sample_ts": "no",
"tags_file": "",
"columns": [
{"type": "DOUBLE", "name": "val"},
{ "type": "INT", "name": "quality"}
],
"tags": [
{"type": "INT", "name": "id", "max": 100, "min": 1}
]
}
]
}
]
}'''
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamFailedTimeout': 10000}
clientCfgDict = {'debugFlag': 135, 'asynclog': 0, 'streamRunHistoryAsync': 1}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
with open('ts-5617.json', 'w') as file:
file.write(insertJson)
tdLog.info("start to insert data: taosBenchmark -f ts-5617.json")
if os.system("taosBenchmark -f ts-5617.json") != 0:
tdLog.exit("taosBenchmark -f ts-5617.json")
tdLog.info("test creating stream with history in normal ......")
start_time = time.time()
tdSql.execute(f'create stream s21 fill_history 1 into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);')
end_time = time.time()
if end_time - start_time > 1:
tdLog.exit("create history stream sync too long")
tdSql.query("show streams")
tdSql.checkRows(1)
tdSql.checkData(0, 1, "init")
while 1:
tdSql.query("show streams")
tdLog.info(f"streams is creating ...")
if tdSql.getData(0, 1) == "ready":
break
else:
time.sleep(5)
tdSql.execute(f'drop stream s21')
tdSql.execute(f'drop table if exists ts5617.st21')
tdLog.info("test creating stream with history in taosd error ......")
tdSql.execute(f'create stream s211 fill_history 1 into ts5617.st211 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);')
tdSql.execute(f'create stable ts5617.st211(ts timestamp, i int) tags(tname varchar(20))')
tdSql.query("show streams")
tdSql.checkRows(1)
tdSql.checkData(0, 1, "init")
while 1:
tdSql.query("show streams")
tdLog.info(f"streams is creating ...")
tdLog.info(tdSql.queryResult)
if tdSql.getData(0, 1) == "failed" and tdSql.getData(0, 2) == "STable already exists":
break
else:
time.sleep(5)
time.sleep(10)
tdSql.execute(f'drop stream s211')
tdSql.execute(f'drop table if exists ts5617.st211')
tdLog.info("test creating stream with history in taosd error ......")
tdSql.execute(f'create stream s21 fill_history 1 into ts5617.st21 as select last(val), last(quality) from ts5617.d_0 interval(1800s);')
tdSql.execute(f'create stream s211 fill_history 1 into ts5617.st211 as select last(val), last(quality) from ts5617.d_0 interval(1800s);')
while 1:
tdSql.query("show streams")
tdLog.info(tdSql.queryResult)
tdLog.info(f"streams is creating ...")
if "failed" in [tdSql.getData(0, 1), tdSql.getData(1, 1)] and "Conflict transaction not completed" in [tdSql.getData(0, 2), tdSql.getData(1, 2)]:
break
else:
time.sleep(5)
tdSql.execute(f'drop stream s21')
tdSql.execute(f'drop stream s211')
tdSql.execute(f'drop table if exists ts5617.st21')
tdSql.execute(f'drop table if exists ts5617.st211')
tdLog.info("test creating stream with history in taosd restart ......")
tdSql.execute(f'create stream s21 fill_history 1 into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);')
tdSql.query("show streams")
tdSql.checkRows(1)
tdSql.checkData(0, 1, "init")
tdLog.debug("restart taosd")
tdDnodes.forcestop(1)
time.sleep(20)
tdDnodes.start(1)
while 1:
tdSql.query("show streams")
tdLog.info(f"streams is creating ...")
tdLog.info(tdSql.queryResult)
if tdSql.getData(0, 1) == "failed" and tdSql.getData(0, 2) == "timeout":
break
else:
time.sleep(5)
return
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())