Merge branch '3.0' of https://github.com/taosdata/TDengine into mark/3.0
This commit is contained in:
commit
ca3a31440d
|
@ -49,6 +49,7 @@ window_clause: {
|
||||||
| STATE_WINDOW(col)
|
| STATE_WINDOW(col)
|
||||||
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
|
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [FILL(fill_mod_and_val)]
|
||||||
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
| EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition
|
||||||
|
| COUNT_WINDOW(count_val[, sliding_val])
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -180,6 +181,19 @@ select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
|
### 计数窗口
|
||||||
|
|
||||||
|
计数窗口按固定的数据行数来划分窗口。默认将数据按时间戳排序,再按照count_val的值,将数据划分为多个窗口,然后做聚合计算。count_val表示每个count window包含的最大数据行数,总数据行数不能整除count_val时,最后一个窗口的行数会小于count_val。sliding_val是常量,表示窗口滑动的数量,类似于 interval的SLIDING。
|
||||||
|
|
||||||
|
以下面的 SQL 语句为例,计数窗口切分如图所示:
|
||||||
|
```sql
|
||||||
|
select _wstart, _wend, count(*) from t count_window(4);
|
||||||
|
```
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### 时间戳伪列
|
### 时间戳伪列
|
||||||
|
|
||||||
窗口聚合查询结果中,如果 SQL 语句中没有指定输出查询结果中的时间戳列,那么最终结果中不会自动包含窗口的时间列信息。如果需要在结果中输出聚合结果所对应的时间窗口信息,需要在 SELECT 子句中使用时间戳相关的伪列: 时间窗口起始时间 (\_WSTART), 时间窗口结束时间 (\_WEND), 时间窗口持续时间 (\_WDURATION), 以及查询整体窗口相关的伪列: 查询窗口起始时间(\_QSTART) 和查询窗口结束时间(\_QEND)。需要注意的是时间窗口起始时间和结束时间均是闭区间,时间窗口持续时间是数据当前时间分辨率下的数值。例如,如果当前数据库的时间分辨率是毫秒,那么结果中 500 就表示当前时间窗口的持续时间是 500毫秒 (500 ms)。
|
窗口聚合查询结果中,如果 SQL 语句中没有指定输出查询结果中的时间戳列,那么最终结果中不会自动包含窗口的时间列信息。如果需要在结果中输出聚合结果所对应的时间窗口信息,需要在 SELECT 子句中使用时间戳相关的伪列: 时间窗口起始时间 (\_WSTART), 时间窗口结束时间 (\_WEND), 时间窗口持续时间 (\_WDURATION), 以及查询整体窗口相关的伪列: 查询窗口起始时间(\_QSTART) 和查询窗口结束时间(\_QEND)。需要注意的是时间窗口起始时间和结束时间均是闭区间,时间窗口持续时间是数据当前时间分辨率下的数值。例如,如果当前数据库的时间分辨率是毫秒,那么结果中 500 就表示当前时间窗口的持续时间是 500毫秒 (500 ms)。
|
||||||
|
|
|
@ -223,10 +223,10 @@ typedef struct SStoreTqReader {
|
||||||
bool (*tqReaderCurrentBlockConsumed)();
|
bool (*tqReaderCurrentBlockConsumed)();
|
||||||
|
|
||||||
struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it
|
struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it
|
||||||
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
// int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||||
|
|
||||||
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
|
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
|
||||||
bool (*tqReaderNextBlockFilterOut)();
|
// bool (*tqReaderNextBlockFilterOut)();
|
||||||
} SStoreTqReader;
|
} SStoreTqReader;
|
||||||
|
|
||||||
typedef struct SStoreSnapshotFn {
|
typedef struct SStoreSnapshotFn {
|
||||||
|
|
|
@ -586,7 +586,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsNumOfSupportVnodes = tsNumOfCores * 2;
|
tsNumOfSupportVnodes = tsNumOfCores * 2;
|
||||||
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
|
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
|
||||||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||||
|
|
|
@ -111,7 +111,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
|
||||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
|
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||||
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
||||||
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||||
|
|
||||||
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||||
|
|
|
@ -1409,24 +1409,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
|
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
|
||||||
strcpy(dcfgReq.config, "resetlog");
|
strcpy(dcfgReq.config, "resetlog");
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
} else if (strncasecmp(cfgReq.config, "supportvnodes", 13) == 0) {
|
|
||||||
int32_t optLen = strlen("supportvnodes");
|
|
||||||
int32_t flag = -1;
|
|
||||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
|
||||||
if (code < 0) return code;
|
|
||||||
|
|
||||||
if (flag < 0 || flag > 4096) {
|
|
||||||
mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
|
||||||
goto _err_out;
|
|
||||||
}
|
|
||||||
if (flag == 0) {
|
|
||||||
flag = tsNumOfCores * 2;
|
|
||||||
}
|
|
||||||
flag = TMAX(flag, 2);
|
|
||||||
|
|
||||||
strcpy(dcfgReq.config, "supportvnodes");
|
|
||||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
|
||||||
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||||
int32_t optLen = strlen("s3blocksize");
|
int32_t optLen = strlen("s3blocksize");
|
||||||
int32_t flag = -1;
|
int32_t flag = -1;
|
||||||
|
|
|
@ -1813,7 +1813,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
||||||
pStream->name, pTrans->id);
|
pStream->name, pTrans->id);
|
||||||
|
|
||||||
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans);
|
int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
|
||||||
|
|
||||||
// todo: not continue, drop all and retry again
|
// todo: not continue, drop all and retry again
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -462,14 +462,22 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||||
|
|
||||||
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
||||||
|
|
||||||
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
|
SEpSet epset = {0};
|
||||||
|
bool hasEpset = false;
|
||||||
|
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
}
|
}
|
||||||
|
@ -478,14 +486,14 @@ static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// build trans to update the epset
|
// build trans to update the epset
|
||||||
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
||||||
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
|
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
|
||||||
taosWLockLatch(&pStream->lock);
|
taosWLockLatch(&pStream->lock);
|
||||||
|
|
||||||
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
|
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
|
||||||
while (streamTaskIterNextTask(pIter)) {
|
while (streamTaskIterNextTask(pIter)) {
|
||||||
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
|
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
|
||||||
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo);
|
int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyStreamTaskIter(pIter);
|
destroyStreamTaskIter(pIter);
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
|
|
|
@ -106,7 +106,7 @@ typedef struct SQueryNode SQueryNode;
|
||||||
#define VND_INFO_FNAME "vnode.json"
|
#define VND_INFO_FNAME "vnode.json"
|
||||||
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
||||||
|
|
||||||
#define VNODE_METRIC_SQL_COUNT "taos_sql_req:count"
|
#define VNODE_METRIC_SQL_COUNT "taosd_sql_req:count"
|
||||||
|
|
||||||
#define VNODE_METRIC_TAG_NAME_SQL_TYPE "sql_type"
|
#define VNODE_METRIC_TAG_NAME_SQL_TYPE "sql_type"
|
||||||
#define VNODE_METRIC_TAG_NAME_CLUSTER_ID "cluster_id"
|
#define VNODE_METRIC_TAG_NAME_CLUSTER_ID "cluster_id"
|
||||||
|
|
|
@ -368,24 +368,11 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo ignore the error in wal?
|
|
||||||
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
SSDataBlock* pDataBlock = NULL;
|
|
||||||
|
|
||||||
uint64_t st = taosGetTimestampMs();
|
uint64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
// try next message in wal file
|
|
||||||
if (walNextValidMsg(pWalReader) < 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
|
||||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
|
||||||
int64_t ver = pWalReader->pHead->head.version;
|
|
||||||
|
|
||||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
|
||||||
pReader->nextBlk = 0;
|
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < numOfBlocks) {
|
while (pReader->nextBlk < numOfBlocks) {
|
||||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
|
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
|
||||||
|
@ -400,33 +387,32 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
|
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
if (pDataBlock == NULL) {
|
return true;
|
||||||
pDataBlock = createOneDataBlock(pRes, true);
|
|
||||||
} else {
|
|
||||||
blockDataMerge(pDataBlock, pRes);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pReader->nextBlk += 1;
|
pReader->nextBlk += 1;
|
||||||
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
|
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
|
|
||||||
if (pDataBlock != NULL) {
|
|
||||||
blockDataCleanup(pReader->pResBlock);
|
|
||||||
copyDataBlock(pReader->pResBlock, pDataBlock);
|
|
||||||
blockDataDestroy(pDataBlock);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosGetTimestampMs() - st > 1000) {
|
if (taosGetTimestampMs() - st > 1000) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// try next message in wal file
|
||||||
|
if (walNextValidMsg(pWalReader) < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||||
|
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||||
|
int64_t ver = pWalReader->pHead->head.version;
|
||||||
|
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
||||||
|
pReader->nextBlk = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,12 +128,12 @@ void initTqAPI(SStoreTqReader* pTq) {
|
||||||
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
|
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
|
||||||
|
|
||||||
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
||||||
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||||
|
|
||||||
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
||||||
pTq->tqGetResultBlock = tqGetResultBlock;
|
pTq->tqGetResultBlock = tqGetResultBlock;
|
||||||
|
|
||||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||||
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
||||||
|
|
||||||
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
|
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
|
||||||
|
|
|
@ -457,6 +457,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool initParam = pSrcParam0 ? true : false;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
|
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
|
||||||
pSrcParam0 = NULL;
|
pSrcParam0 = NULL;
|
||||||
|
@ -466,7 +467,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
|
||||||
pSrcParam1 = NULL;
|
pSrcParam1 = NULL;
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1);
|
code = buildMergeJoinOperatorParam(ppParam, initParam, pGcParam0, pGcParam1);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
if (pSrcParam0) {
|
if (pSrcParam0) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -390,10 +390,12 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SCountWindowInfo curWin = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
|
||||||
|
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.twAggSup
|
// 2.twAggSup
|
||||||
|
@ -694,6 +696,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
||||||
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
|
||||||
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -704,8 +708,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
doStreamCountDecodeOpState(buff, len, pOperator, true);
|
doStreamCountDecodeOpState(buff, len, pOperator, true);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
|
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
||||||
|
|
|
@ -406,6 +406,7 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
// 4.checksum
|
// 4.checksum
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
@ -423,6 +424,8 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
|
||||||
|
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -735,6 +738,8 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->reCkBlock = false;
|
pInfo->reCkBlock = false;
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
|
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -746,8 +751,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
|
||||||
pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2533,7 +2533,6 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL
|
||||||
|
|
||||||
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
key->pStatePos->pRowBuff = NULL;
|
|
||||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -2591,6 +2590,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
|
@ -2609,6 +2609,8 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
|
||||||
|
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -2992,6 +2994,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||||
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||||
|
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -3002,8 +3006,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
|
||||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||||
|
@ -3538,6 +3540,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
if (!pInfo) {
|
if (!pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
// 5.checksum
|
// 5.checksum
|
||||||
if (isParent) {
|
if (isParent) {
|
||||||
|
@ -3556,6 +3559,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL,
|
||||||
|
pAggSup->stateKeySize, compareStateKey,
|
||||||
|
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -3873,6 +3879,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
|
|
||||||
|
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
// for stream
|
// for stream
|
||||||
void* buff = NULL;
|
void* buff = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -3884,8 +3892,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
}
|
}
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
|
||||||
pInfo, pTaskInfo);
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
||||||
|
|
|
@ -4547,7 +4547,7 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange);
|
code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange);
|
||||||
}
|
}
|
||||||
if (pSelect->pWhere != NULL) {
|
if (pSelect->pWhere != NULL && pCxt->pParseCxt->topicQuery == false) {
|
||||||
setTableVgroupsFromEqualTbnameCond(pCxt, pSelect);
|
setTableVgroupsFromEqualTbnameCond(pCxt, pSelect);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
@ -8135,7 +8135,9 @@ static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamSt
|
||||||
SColumnDefNode* pDef = (SColumnDefNode*)pTagDef;
|
SColumnDefNode* pDef = (SColumnDefNode*)pTagDef;
|
||||||
if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) {
|
if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) {
|
||||||
SNode* pFunc = NULL;
|
SNode* pFunc = NULL;
|
||||||
int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc);
|
SDataType defType = pDef->dataType;
|
||||||
|
defType.bytes = calcTypeBytes(defType);
|
||||||
|
int32_t code = createCastFunc(pCxt, pTagExpr, defType, &pFunc);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -940,6 +940,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
|
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
|
||||||
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
||||||
|
|
||||||
|
if (isHb && persistHandle && trans->pHandle == 0) {
|
||||||
|
trans->pHandle = rpcAllocHandle();
|
||||||
|
}
|
||||||
|
|
||||||
if (pJob && pTask) {
|
if (pJob && pTask) {
|
||||||
SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
|
SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
|
||||||
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
|
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
|
||||||
|
|
|
@ -906,6 +906,7 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef BUILD_NO_CALL
|
||||||
static int32_t chkpIdComp(const void* a, const void* b) {
|
static int32_t chkpIdComp(const void* a, const void* b) {
|
||||||
int64_t x = *(int64_t*)a;
|
int64_t x = *(int64_t*)a;
|
||||||
int64_t y = *(int64_t*)b;
|
int64_t y = *(int64_t*)b;
|
||||||
|
@ -964,6 +965,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
||||||
taosMemoryFree(chkpPath);
|
taosMemoryFree(chkpPath);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
|
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
|
||||||
|
|
|
@ -353,7 +353,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
|
||||||
|
status == TASK_STATUS__STOP);
|
||||||
int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
int32_t code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
|
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
|
||||||
|
|
|
@ -39,7 +39,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
|
||||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for the dispath info and the upstream task info
|
// check for the dispatch info and the upstream task info
|
||||||
int32_t level = pTask->info.taskLevel;
|
int32_t level = pTask->info.taskLevel;
|
||||||
if (level == TASK_LEVEL__SOURCE) {
|
if (level == TASK_LEVEL__SOURCE) {
|
||||||
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
|
||||||
|
@ -622,6 +622,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
EPSET_TO_STR(pEpSet, buf);
|
EPSET_TO_STR(pEpSet, buf);
|
||||||
|
int32_t id = pTask->id.taskId;
|
||||||
|
|
||||||
int8_t type = pTask->outputInfo.type;
|
int8_t type = pTask->outputInfo.type;
|
||||||
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -633,8 +634,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
|
|
||||||
if (pVgInfo->vgId == nodeId) {
|
if (pVgInfo->vgId == nodeId) {
|
||||||
epsetAssign(&pVgInfo->epSet, pEpSet);
|
epsetAssign(&pVgInfo->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId,
|
||||||
pVgInfo->taskId, nodeId, buf);
|
buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -642,8 +643,8 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
|
STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher;
|
||||||
if (pDispatcher->nodeId == nodeId) {
|
if (pDispatcher->nodeId == nodeId) {
|
||||||
epsetAssign(&pDispatcher->epSet, pEpSet);
|
epsetAssign(&pDispatcher->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId,
|
||||||
pDispatcher->taskId, nodeId, buf);
|
buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ class BackendEnv : public ::testing::Test {
|
||||||
|
|
||||||
void *backendCreate() {
|
void *backendCreate() {
|
||||||
const char *streamPath = "/tmp";
|
const char *streamPath = "/tmp";
|
||||||
void *p = NULL;
|
void * p = NULL;
|
||||||
|
|
||||||
// char *absPath = NULL;
|
// char *absPath = NULL;
|
||||||
// // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2);
|
// // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2);
|
||||||
|
@ -52,7 +52,7 @@ SStreamState *stateCreate(const char *path) {
|
||||||
}
|
}
|
||||||
void *backendOpen() {
|
void *backendOpen() {
|
||||||
streamMetaInit();
|
streamMetaInit();
|
||||||
const char *path = "/tmp/backend";
|
const char * path = "/tmp/backend";
|
||||||
SStreamState *p = stateCreate(path);
|
SStreamState *p = stateCreate(path);
|
||||||
ASSERT(p != NULL);
|
ASSERT(p != NULL);
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ void *backendOpen() {
|
||||||
|
|
||||||
const char *val = "value data";
|
const char *val = "value data";
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char *newVal = NULL;
|
char * newVal = NULL;
|
||||||
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||||
ASSERT(len == strlen(val));
|
ASSERT(len == strlen(val));
|
||||||
}
|
}
|
||||||
|
@ -100,7 +100,7 @@ void *backendOpen() {
|
||||||
|
|
||||||
const char *val = "value data";
|
const char *val = "value data";
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char *newVal = NULL;
|
char * newVal = NULL;
|
||||||
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||||
ASSERT(code != 0);
|
ASSERT(code != 0);
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ void *backendOpen() {
|
||||||
|
|
||||||
winkey.groupId = 0;
|
winkey.groupId = 0;
|
||||||
winkey.ts = tsArray[0];
|
winkey.ts = tsArray[0];
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
|
||||||
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
|
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
|
||||||
|
@ -157,7 +157,7 @@ void *backendOpen() {
|
||||||
key.ts = tsArray[i];
|
key.ts = tsArray[i];
|
||||||
key.exprIdx = i;
|
key.exprIdx = i;
|
||||||
|
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
|
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
|
||||||
ASSERT(len == strlen("Value"));
|
ASSERT(len == strlen("Value"));
|
||||||
|
@ -168,7 +168,7 @@ void *backendOpen() {
|
||||||
key.ts = tsArray[i];
|
key.ts = tsArray[i];
|
||||||
key.exprIdx = i;
|
key.exprIdx = i;
|
||||||
|
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
streamStateFuncDel_rocksdb(p, &key);
|
streamStateFuncDel_rocksdb(p, &key);
|
||||||
}
|
}
|
||||||
|
@ -213,7 +213,7 @@ void *backendOpen() {
|
||||||
{
|
{
|
||||||
SSessionKey key;
|
SSessionKey key;
|
||||||
memset(&key, 0, sizeof(key));
|
memset(&key, 0, sizeof(key));
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t vlen = 0;
|
int32_t vlen = 0;
|
||||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
@ -260,7 +260,7 @@ void *backendOpen() {
|
||||||
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||||
key.groupId = (uint64_t)(i);
|
key.groupId = (uint64_t)(i);
|
||||||
key.ts = tsArray[i];
|
key.ts = tsArray[i];
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t vlen = 0;
|
int32_t vlen = 0;
|
||||||
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
|
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
|
||||||
taosMemoryFreeClear(val);
|
taosMemoryFreeClear(val);
|
||||||
|
@ -272,7 +272,7 @@ void *backendOpen() {
|
||||||
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
|
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
|
||||||
ASSERT(pCurr != NULL);
|
ASSERT(pCurr != NULL);
|
||||||
|
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t vlen = 0;
|
int32_t vlen = 0;
|
||||||
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
|
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
|
||||||
ASSERT(vlen == strlen("Value"));
|
ASSERT(vlen == strlen("Value"));
|
||||||
|
@ -296,7 +296,7 @@ void *backendOpen() {
|
||||||
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||||
key.groupId = (uint64_t)(i);
|
key.groupId = (uint64_t)(i);
|
||||||
key.ts = tsArray[i];
|
key.ts = tsArray[i];
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t vlen = 0;
|
int32_t vlen = 0;
|
||||||
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
|
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
|
||||||
taosMemoryFreeClear(val);
|
taosMemoryFreeClear(val);
|
||||||
|
@ -338,7 +338,7 @@ void *backendOpen() {
|
||||||
char key[128] = {0};
|
char key[128] = {0};
|
||||||
sprintf(key, "tbname_%d", i);
|
sprintf(key, "tbname_%d", i);
|
||||||
|
|
||||||
char *val = NULL;
|
char * val = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
|
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
@ -354,7 +354,7 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
SStreamState *p = (SStreamState *)backendOpen();
|
SStreamState *p = (SStreamState *)backendOpen();
|
||||||
int64_t tsStart = taosGetTimestampMs();
|
int64_t tsStart = taosGetTimestampMs();
|
||||||
{
|
{
|
||||||
void *pBatch = streamStateCreateBatch();
|
void * pBatch = streamStateCreateBatch();
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
char key[128] = {0};
|
char key[128] = {0};
|
||||||
|
@ -368,7 +368,7 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
streamStateDestroyBatch(pBatch);
|
streamStateDestroyBatch(pBatch);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
void *pBatch = streamStateCreateBatch();
|
void * pBatch = streamStateCreateBatch();
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
char valBuf[256] = {0};
|
char valBuf[256] = {0};
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
|
@ -385,7 +385,7 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
// do checkpoint 2
|
// do checkpoint 2
|
||||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
|
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
|
||||||
{
|
{
|
||||||
void *pBatch = streamStateCreateBatch();
|
void * pBatch = streamStateCreateBatch();
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
char valBuf[256] = {0};
|
char valBuf[256] = {0};
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
|
@ -407,12 +407,22 @@ TEST_F(BackendEnv, checkOpen) {
|
||||||
// taosMkDir(dump);
|
// taosMkDir(dump);
|
||||||
taosMulMkDir(dump);
|
taosMulMkDir(dump);
|
||||||
SBkdMgt *mgt = bkdMgtCreate((char *)path);
|
SBkdMgt *mgt = bkdMgtCreate((char *)path);
|
||||||
SArray *result = taosArrayInit(4, sizeof(void *));
|
SArray * result = taosArrayInit(4, sizeof(void *));
|
||||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
||||||
|
|
||||||
|
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4);
|
||||||
|
|
||||||
|
taosArrayClear(result);
|
||||||
|
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);
|
||||||
bkdMgtDestroy(mgt);
|
bkdMgtDestroy(mgt);
|
||||||
streamStateClose((SStreamState *)p, true);
|
streamStateClose((SStreamState *)p, true);
|
||||||
|
// {
|
||||||
|
// taosRemoveDir("/tmp/backend");
|
||||||
|
// const char * path = "/tmp/backend";
|
||||||
|
// SStreamState *p = stateCreate(path);
|
||||||
|
// }
|
||||||
taosRemoveDir(path);
|
taosRemoveDir(path);
|
||||||
|
// streamStateClose((SStreamState *)p, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; }
|
TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; }
|
||||||
|
@ -430,6 +440,20 @@ TEST_F(BackendEnv, backendUtil) {
|
||||||
ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v);
|
ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
TEST_F(BackendEnv, oldBackendInit) {
|
||||||
|
const char *path = "/tmp/backend1";
|
||||||
|
taosMulMkDir(path);
|
||||||
|
{
|
||||||
|
SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10);
|
||||||
|
streamBackendCleanup((void *)p);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10);
|
||||||
|
streamBackendCleanup((void *)p);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRemoveDir(path);
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
testing::InitGoogleTest(&argc, argv);
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
|
|
@ -584,8 +584,8 @@ void* destroyConnPool(SCliThrd* pThrd) {
|
||||||
|
|
||||||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||||
void* pool = pThrd->pool;
|
void* pool = pThrd->pool;
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
|
||||||
STrans* pTranInst = pThrd->pTransInst;
|
STrans* pTranInst = pThrd->pTransInst;
|
||||||
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||||
|
@ -867,17 +867,18 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
|
|
||||||
conn->broken = true;
|
conn->broken = true;
|
||||||
|
if (conn->list == NULL) {
|
||||||
|
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||||
|
}
|
||||||
|
|
||||||
if (conn->list != NULL) {
|
if (conn->list) {
|
||||||
SConnList* connList = conn->list;
|
SConnList* list = conn->list;
|
||||||
connList->list->numOfConn--;
|
list->list->numOfConn--;
|
||||||
connList->size--;
|
if (conn->status == ConnInPool) {
|
||||||
} else {
|
list->size--;
|
||||||
if (pThrd->pool) {
|
|
||||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
|
|
||||||
if (connList != NULL) connList->list->numOfConn--;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
conn->list = NULL;
|
conn->list = NULL;
|
||||||
pThrd->newConnCount--;
|
pThrd->newConnCount--;
|
||||||
|
|
||||||
|
|
|
@ -527,6 +527,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
if (!transQueueEmpty(&conn->srvMsgs)) {
|
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||||
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
||||||
if (msg->type == Register && conn->status == ConnAcquire) {
|
if (msg->type == Register && conn->status == ConnAcquire) {
|
||||||
|
if (conn->regArg.init) {
|
||||||
|
transFreeMsg(conn->regArg.msg.pCont);
|
||||||
|
conn->regArg.init = 0;
|
||||||
|
}
|
||||||
conn->regArg.notifyCount = 0;
|
conn->regArg.notifyCount = 0;
|
||||||
conn->regArg.init = 1;
|
conn->regArg.init = 1;
|
||||||
conn->regArg.msg = msg->msg;
|
conn->regArg.msg = msg->msg;
|
||||||
|
@ -1350,6 +1354,11 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transQueuePop(&conn->srvMsgs);
|
transQueuePop(&conn->srvMsgs);
|
||||||
|
|
||||||
|
if (conn->regArg.init) {
|
||||||
|
transFreeMsg(conn->regArg.msg.pCont);
|
||||||
|
conn->regArg.init = 0;
|
||||||
|
}
|
||||||
conn->regArg.notifyCount = 0;
|
conn->regArg.notifyCount = 0;
|
||||||
conn->regArg.init = 1;
|
conn->regArg.init = 1;
|
||||||
conn->regArg.msg = msg->msg;
|
conn->regArg.msg = msg->msg;
|
||||||
|
|
|
@ -115,7 +115,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
||||||
|
@ -228,7 +228,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
|
||||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||||
|
|
|
@ -21,6 +21,8 @@ print create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0
|
||||||
|
|
||||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
|
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
@ -457,6 +459,8 @@ print create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0
|
||||||
|
|
||||||
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
|
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
|
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
|
||||||
|
@ -504,6 +508,9 @@ sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
||||||
print create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
print create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||||
|
|
||||||
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
||||||
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
||||||
|
@ -557,6 +564,8 @@ print create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0
|
||||||
|
|
||||||
sql create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart AS startts, min(c1),count(c1) from t1 state_window(c1);
|
sql create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart AS startts, min(c1),count(c1) from t1 state_window(c1);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 (ts, c1) values (1668073288209, 11);
|
sql insert into t1 (ts, c1) values (1668073288209, 11);
|
||||||
sql insert into t1 (ts, c1) values (1668073288210, 11);
|
sql insert into t1 (ts, c1) values (1668073288210, 11);
|
||||||
sql insert into t1 (ts, c1) values (1668073288211, 11);
|
sql insert into t1 (ts, c1) values (1668073288211, 11);
|
||||||
|
@ -745,6 +754,9 @@ sql create table b (c timestamp, d int, e int , f int, g double);
|
||||||
print create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
print create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
||||||
|
|
||||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
|
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
|
||||||
sql select * from streamt order by c1, c2, c3;
|
sql select * from streamt order by c1, c2, c3;
|
||||||
|
|
||||||
|
|
|
@ -359,8 +359,74 @@ class TDTestCase:
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
|
||||||
|
def consume_TS_4540_Test(self):
|
||||||
|
tdSql.execute(f'create database if not exists test')
|
||||||
|
tdSql.execute(f'use test')
|
||||||
|
tdSql.execute(f'CREATE STABLE `test`.`b` ( `time` TIMESTAMP , `task_id` NCHAR(1000) ) TAGS( `key` NCHAR(1000))')
|
||||||
|
tdSql.execute(f"insert into `test`.b1 using `test`.`b`(key) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(key) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(key) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')")
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic tt as select tbname,task_id,key from b')
|
||||||
|
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.offset.reset": "earliest",
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["tt"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
res = consumer.poll(1)
|
||||||
|
if not res:
|
||||||
|
break
|
||||||
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
for block in val:
|
||||||
|
data = block.fetchall()
|
||||||
|
print(data)
|
||||||
|
if data != [('b1', '32', '1')] and data != [('b2', '43', '2')] and data != [('b3', '123456', '3')]:
|
||||||
|
tdLog.exit(f"index = 0 table b1 error")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
|
def consume_ts_4544(self):
|
||||||
|
tdSql.execute(f'create database if not exists d1')
|
||||||
|
tdSql.execute(f'use d1')
|
||||||
|
tdSql.execute(f'create table stt(ts timestamp, i int) tags(t int)')
|
||||||
|
tdSql.execute(f'insert into tt1 using stt tags(1) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into tt2 using stt tags(2) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into tt3 using stt tags(3) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into tt1 using stt tags(1) values(now+5s, 11) (now+10s, 12)')
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic topic_in as select * from stt where tbname in ("tt2")')
|
||||||
|
|
||||||
|
consumer_dict = {
|
||||||
|
"group.id": "g1",
|
||||||
|
"td.connect.user": "root",
|
||||||
|
"td.connect.pass": "taosdata",
|
||||||
|
"auto.offset.reset": "earliest",
|
||||||
|
}
|
||||||
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
|
try:
|
||||||
|
consumer.subscribe(["topic_in"])
|
||||||
|
except TmqError:
|
||||||
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
consumer.close()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.consumeTest()
|
self.consumeTest()
|
||||||
|
self.consume_ts_4544()
|
||||||
|
self.consume_TS_4540_Test()
|
||||||
|
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
self.checkWal1VgroupOnlyMeta()
|
self.checkWal1VgroupOnlyMeta()
|
||||||
|
|
Loading…
Reference in New Issue