diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 7e2d09dd63..00a36391fa 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -64,6 +64,7 @@ typedef struct { int64_t verInSnapshotting; int64_t snapshotVer; int64_t commitVer; + int64_t appliedVer; int64_t lastVer; } SWalVer; @@ -172,6 +173,9 @@ int32_t walRollback(SWal *, int64_t ver); int32_t walBeginSnapshot(SWal *, int64_t ver); int32_t walEndSnapshot(SWal *); int32_t walRestoreFromSnapshot(SWal *, int64_t ver); +// for tq +int32_t walApplyVer(SWal *, int64_t ver); + // int32_t walDataCorrupted(SWal*); // read @@ -186,7 +190,6 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); - typedef struct { int64_t refId; int64_t ver; @@ -206,6 +209,7 @@ int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); int64_t walGetCommittedVer(SWal *); +int64_t walGetAppliedVer(SWal *); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 315b7c3afc..f7387f7e88 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -265,6 +265,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int64_t consumerId = be64toh(pReq->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); + if (pConsumer == NULL) { + terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; + return -1; + } atomic_store_32(&pConsumer->hbStatus, 0); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c929c84203..4c0d416ad1 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -237,6 +237,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { + walApplyVer(pTq->pVnode->pWal, ver); + if (msgType == TDMT_VND_SUBMIT) { if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; @@ -253,4 +255,3 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) return 0; } - diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6c3d0648e0..c1778ed5ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1455,7 +1455,6 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer (pBlock->minVersion <= pVerRange->maxVer); } - static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) { size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); @@ -1507,7 +1506,7 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl return doCheckforDatablockOverlap(pBlockScanInfo, pBlock); } else { int32_t index = pBlockScanInfo->fileDelIndex; - while(1) { + while (1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); if (p->ts > pBlock->minKey.ts && index > 0) { index -= 1; @@ -2808,7 +2807,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (pCond->suid != 0) { (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1); - ASSERT((*ppReader)->pSchema); + // ASSERT((*ppReader)->pSchema); } else if (taosArrayGetSize(pTableList) > 0) { STableKeyInfo* pKey = taosArrayGet(pTableList, 0); (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8e59d97286..e6d116dfef 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -878,6 +878,8 @@ _exit: tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); } + vDebug("successful submit in vg %d version %ld", pVnode->config.vgId, version); + return 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b0a74c3002..d206f56ec3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -660,6 +660,62 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag) { taosMemoryFree(pBuf); } +typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index); + +int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) { + int firstPos = 0, lastPos = num - 1, midPos = -1; + int numOfRows = 0; + + if (num <= 0) return -1; + if (order == TSDB_ORDER_DESC) { + // find the first position which is smaller or equal than the key + while (1) { + if (comparefn(pKey, keyList, lastPos) >= 0) return lastPos; + if (comparefn(pKey, keyList, firstPos) == 0) return firstPos; + if (comparefn(pKey, keyList, firstPos) < 0) return firstPos - 1; + + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; + + if (comparefn(pKey, keyList, midPos) < 0) { + lastPos = midPos - 1; + } else if (comparefn(pKey, keyList, midPos) > 0) { + firstPos = midPos + 1; + } else { + break; + } + } + + } else { + // find the first position which is bigger or equal than the key + while (1) { + if (comparefn(pKey, keyList, firstPos) <= 0) return firstPos; + if (comparefn(pKey, keyList, lastPos) == 0) return lastPos; + + if (comparefn(pKey, keyList, lastPos) > 0) { + lastPos = lastPos + 1; + if (lastPos >= num) + return -1; + else + return lastPos; + } + + numOfRows = lastPos - firstPos + 1; + midPos = (numOfRows >> 1) + firstPos; + + if (comparefn(pKey, keyList, midPos) < 0) { + lastPos = midPos - 1; + } else if (comparefn(pKey, keyList, midPos) > 0) { + firstPos = midPos + 1; + } else { + break; + } + } + } + + return midPos; +} + typedef int64_t (*__get_value_fn_t)(void* data, int32_t index); int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) { @@ -722,14 +778,31 @@ int64_t getReskey(void* data, int32_t index) { return *(int64_t*)pos->key; } +int32_t compareResKey(void* pKey, void* data, int32_t index) { + SArray* res = (SArray*)data; + SResKeyPos* pos = taosArrayGetP(res, index); + SWinRes* pData = (SWinRes*) pKey; + if (pData->ts == *(int64_t*)pos->key) { + if (pData->groupId > pos->groupId) { + return 1; + } else if (pData->groupId < pos->groupId) { + return -1; + } + return 0; + } else if (pData->ts > *(int64_t*)pos->key) { + return 1; + } + return -1; +} + static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) { int32_t size = taosArrayGetSize(pUpdated); - int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey); + SWinRes data = {.ts = ts, .groupId = groupId}; + int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey); if (index == -1) { index = 0; } else { - TSKEY resTs = getReskey(pUpdated, index); - if (resTs < ts) { + if (compareResKey(&data, pUpdated, index) > 0) { index++; } else { return TSDB_CODE_SUCCESS; @@ -753,10 +826,10 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); } -static void removeResult(SArray* pUpdated, TSKEY key) { +static void removeResult(SArray* pUpdated, SWinRes* pKey) { int32_t size = taosArrayGetSize(pUpdated); - int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey); - if (index >= 0 && key == getReskey(pUpdated, index)) { + int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey); + if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) { taosArrayRemove(pUpdated, index); } } @@ -765,7 +838,7 @@ static void removeResults(SArray* pWins, SArray* pUpdated) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { SWinRes* pW = taosArrayGet(pWins, i); - removeResult(pUpdated, pW->ts); + removeResult(pUpdated, pW); } } @@ -775,14 +848,30 @@ int64_t getWinReskey(void* data, int32_t index) { return pos->ts; } +int32_t compareWinRes(void* pKey, void* data, int32_t index) { + SArray* res = (SArray*)data; + SWinRes* pos = taosArrayGetP(res, index); + SResKeyPos* pData = (SResKeyPos*) pKey; + if (*(int64_t*)pData->key == pos->ts) { + if (pData->groupId > pos->groupId) { + return 1; + } else if (pData->groupId < pos->groupId) { + return -1; + } + return 0; + } else if (*(int64_t*)pData->key > pos->ts) { + return 1; + } + return -1; +} + static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) { int32_t upSize = taosArrayGetSize(pUpdated); int32_t delSize = taosArrayGetSize(pDelWins); for (int32_t i = 0; i < upSize; i++) { SResKeyPos* pResKey = taosArrayGetP(pUpdated, i); - int64_t key = *(int64_t*)pResKey->key; - int32_t index = binarySearch(pDelWins, delSize, key, TSDB_ORDER_DESC, getWinReskey); - if (index >= 0 && key == getWinReskey(pDelWins, index)) { + int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); + if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) { taosArrayRemove(pDelWins, index); } } @@ -924,11 +1013,17 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { TSKEY* tsCols = NULL; + if (pBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; - if (tsCols != NULL) { + // no data in primary ts + if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) { + return NULL; + } + + if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) { blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); } } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e03af279ae..9c004bf1c4 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1425,6 +1425,17 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) } static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + // forbid null as first/last input, since first(c0, null, 1) may have different number of input + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + + for (int32_t i = 0; i < numOfParams; ++i) { + uint8_t nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, i)); + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; + if (IS_NULL_TYPE(paraType) && QUERY_NODE_VALUE == nodeType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + } + pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; return TSDB_CODE_SUCCESS; } @@ -1435,6 +1446,15 @@ static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32 uint8_t paraType = ((SExprNode*)pPara)->resType.type; int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes; if (isPartial) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + for (int32_t i = 0; i < numOfParams; ++i) { + uint8_t nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, i)); + uint8_t pType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; + if (IS_NULL_TYPE(pType) && QUERY_NODE_VALUE == nodeType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + } + pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; } else { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 991b50f7c0..edc811fe82 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -33,6 +33,8 @@ int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; } +int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } + static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 8b4225c80c..5bc9cdafa2 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -66,9 +66,15 @@ void walCloseReader(SWalReader *pRead) { } int32_t walNextValidMsg(SWalReader *pRead) { - wDebug("vgId:%d wal start to fetch", pRead->pWal->cfg.vgId); int64_t fetchVer = pRead->curVersion; - int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal); + int64_t lastVer = walGetLastVer(pRead->pWal); + int64_t committedVer = walGetCommittedVer(pRead->pWal); + int64_t appliedVer = walGetAppliedVer(pRead->pWal); + int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer; + endVer = TMIN(appliedVer, endVer); + + wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld", + pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); while (fetchVer <= endVer) { if (walFetchHeadNew(pRead, fetchVer) < 0) { return -1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 26dc3cdffb..4fc135a1cf 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -64,6 +64,12 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { return 0; } +int32_t walApplyVer(SWal *pWal, int64_t ver) { + // TODO: error check + pWal->vers.appliedVer = ver; + return 0; +} + int32_t walCommit(SWal *pWal, int64_t ver) { ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); diff --git a/tests/script/tsim/stream/basic1.sim b/tests/script/tsim/stream/basic1.sim index 2a6d64bcaf..a6f9860831 100644 --- a/tests/script/tsim/stream/basic1.sim +++ b/tests/script/tsim/stream/basic1.sim @@ -462,6 +462,113 @@ if $data25 != 3 then return -1 endi +sql create database test2 vgroups 1 +sql show databases +sql use test2 +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); +sql create table t4 using st tags(2,2,2); +sql create table t5 using st tags(2,2,2); +sql create stream streams2 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3,max(b) c4 from st partition by tbname interval(10s) + +sql insert into t1 values(1648791213000,1,1,1,1.0) t2 values(1648791213000,2,2,2,2.0) t3 values(1648791213000,3,3,3,3.0) t4 values(1648791213000,4,4,4,4.0); + +$loop_count = 0 + +loop0: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt; + +if $rows != 4 then + print =====rows=$rows + goto loop0 +endi + +sql insert into t1 values(1648791213000,5,5,5,5.0) t2 values(1648791213000,6,6,6,6.0) t5 values(1648791213000,7,7,7,7.0); + + +$loop_count = 0 + +loop1: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt order by c4 desc; + +if $rows != 5 then + print =====rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 1 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 7 then + print =====data02=$data02 + goto loop1 +endi + +# row 1 +if $data11 != 1 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop1 +endi + +# row 2 +if $data21 != 1 then + print =====data21=$data21 + goto loop1 +endi + +if $data22 != 5 then + print =====data22=$data22 + goto loop1 +endi + +sql insert into t1 values(1648791213000,8,8,8,8.0); + +$loop_count = 0 + +loop2: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt order by c4 desc; + +# row 0 +if $data01 != 1 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 8 then + print =====data02=$data02 + goto loop2 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index f34a50de9d..4364b56d44 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -366,18 +366,21 @@ if $data32 != 8 then goto loop1 endi +#$loop_all = 0 +#looptest: + sql drop database IF EXISTS test2; sql drop stream IF EXISTS streams21; sql drop stream IF EXISTS streams22; -sql create database test2 vgroups 2; +sql create database test2 vgroups 6; sql use test2; sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); -sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); +sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); +sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791223001,2,2,2,1.1); @@ -394,7 +397,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1); $loop_count = 0 loop2: -sleep 300 +sleep 100 $loop_count = $loop_count + 1 if $loop_count == 10 then @@ -452,7 +455,7 @@ print step 6 $loop_count = 0 loop3: -sleep 300 +# sleep 300 $loop_count = $loop_count + 1 if $loop_count == 10 then @@ -464,7 +467,7 @@ sql select * from streamt2; # row 0 if $data01 != 4 then print =====data01=$data01 - # goto loop3 + goto loop3 endi if $data02 != 10 then @@ -505,4 +508,9 @@ if $data32 != 8 then goto loop3 endi +$loop_all = $loop_all + 1 +print ============loop_all=$loop_all + +#goto looptest + system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py index cbe83b5a30..cdb26f7589 100644 --- a/tests/system-test/2-query/last_row.py +++ b/tests/system-test/2-query/last_row.py @@ -221,7 +221,7 @@ class TDTestCase: tdSql.execute("use testdb") # bug need fix - tdSql.query("select last_row(c1 ,NULL) from testdb.t1") + tdSql.error("select last_row(c1 ,NULL) from testdb.t1") error_sql_lists = [ "select last_row from testdb.t1", diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index 1622ad7621..8fcb57aea6 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -225,7 +225,7 @@ class TDTestCase: tdSql.prepare() self.prepareTestEnv() self.tmqCase1() - # self.tmqCase2() TD-17267 + # self.tmqCase2() # TD-17267 def stop(self):