diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 669340f9e5..9987dab166 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -211,6 +211,7 @@ typedef struct SStoreTqReader { bool (*tqNextBlockImpl)(); // todo remove it SSDataBlock* (*tqGetResultBlock)(); int64_t (*tqGetResultBlockTime)(); + int32_t (*tqGetStreamExecProgress)(); void (*tqReaderSetColIdList)(); int32_t (*tqReaderSetQueryTableList)(); @@ -266,16 +267,11 @@ typedef struct SStoreMeta { // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list); int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); - void* storeGetVersionRange; - void* storeGetLastTimestamp; - - int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema + int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); - int64_t (*getNumOfRowsInMem)(void* pVnode); - SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 24222677a4..c2f7c6de2f 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#ifndef _STREAM_STATE_H_ +#define _STREAM_STATE_H_ + #include "tdatablock.h" #include "rocksdb/c.h" @@ -20,9 +23,6 @@ #include "tsimplehash.h" #include "tstreamFileState.h" -#ifndef _STREAM_STATE_H_ -#define _STREAM_STATE_H_ - #ifdef __cplusplus extern "C" { #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9738be839d..2135bb706b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#ifndef _STREAM_H_ +#define _STREAM_H_ + #include "os.h" #include "streamState.h" #include "tdatablock.h" @@ -26,9 +29,6 @@ extern "C" { #endif -#ifndef _STREAM_H_ -#define _STREAM_H_ - #define ONE_MiB_F (1048576.0) #define ONE_KiB_F (1024.0) #define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F) @@ -313,7 +313,7 @@ typedef struct SCheckpointInfo { int64_t failedId; // record the latest failed checkpoint id int64_t checkpointingId; int32_t downstreamAlignNum; - int32_t checkpointNotReadyTasks; + int32_t numOfNotReady; bool dispatchCheckpointTrigger; int64_t msgVer; int32_t transId; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index bedd72759d..554205decb 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -556,7 +556,7 @@ typedef struct { } SMqConsumerObj; SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 372612274f..4d1125a340 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); +void mndInitExecInfo(); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 47b26fba24..0a78914011 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); -static void freeCheckpointCandEntry(void *); -static void freeTaskList(void *param); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); - taosThreadMutexInit(&execInfo.lock, NULL); - _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); - - execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); - execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); - - taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); - taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); + mndInitExecInfo(); if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; @@ -2117,16 +2105,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -void freeCheckpointCandEntry(void *param) { - SCheckpointCandEntry *pEntry = param; - taosMemoryFreeClear(pEntry->pName); -} - -void freeTaskList(void* param) { - SArray** pList = (SArray **)param; - taosArrayDestroy(*pList); -} - static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for(int32_t i = 0; i < num; ++i) { @@ -2202,4 +2180,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 005caea31b..97474fa851 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (taosArrayGetSize(pFailedTasks) > 0) { - bool allReady = true; - SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); - taosArrayDestroy(p); + bool allReady = true; + if (pMnode != NULL) { + SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(p); + } else { + allReady = false; + } if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", - pInfo->checkpointId, pInfo->transId); + pInfo->checkpointId, pInfo->transId); mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 235c604b27..3cabce2201 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * taosWUnLockLatch(&pStream->lock); return 0; } + +static void freeCheckpointCandEntry(void *param) { + SCheckpointCandEntry *pEntry = param; + taosMemoryFreeClear(pEntry->pName); +} + +static void freeTaskList(void* param) { + SArray** pList = (SArray **)param; + taosArrayDestroy(*pList); +} + +void mndInitExecInfo() { + taosThreadMutexInit(&execInfo.lock, NULL); + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + + execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); + execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + + taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); + taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); +} diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index a002b20bde..bc5b5125f1 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -4,7 +4,7 @@ add_subdirectory(acct) #add_subdirectory(db) #add_subdirectory(dnode) add_subdirectory(func) -#add_subdirectory(mnode) +add_subdirectory(stream) add_subdirectory(profile) add_subdirectory(qnode) add_subdirectory(sdb) diff --git a/source/dnode/mnode/impl/test/stream/CMakeLists.txt b/source/dnode/mnode/impl/test/stream/CMakeLists.txt new file mode 100644 index 0000000000..b1bb62735f --- /dev/null +++ b/source/dnode/mnode/impl/test/stream/CMakeLists.txt @@ -0,0 +1,13 @@ +SET(CMAKE_CXX_STANDARD 11) + +aux_source_directory(. MNODE_STREAM_TEST_SRC) +add_executable(streamTest ${MNODE_STREAM_TEST_SRC}) +target_link_libraries( + streamTest + PRIVATE dnode gtest +) + +add_test( + NAME streamTest + COMMAND streamTest +) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp new file mode 100644 index 0000000000..a3babad80c --- /dev/null +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include +#include +#include "../../inc/mndStream.h" + +namespace { +SRpcMsg buildHbReq() { + SStreamHbMsg msg = {0}; + msg.vgId = 1; + msg.numOfTasks = 5; + msg.pTaskStatus = taosArrayInit(4, sizeof(STaskStatusEntry)); + + for (int32_t i = 0; i < 4; ++i) { + STaskStatusEntry entry = {0}; + entry.nodeId = i + 1; + entry.stage = 1; + entry.id.taskId = i + 1; + entry.id.streamId = 999; + + if (i == 0) { + entry.stage = 4; + } + + taosArrayPush(msg.pTaskStatus, &entry); + } + + // (p->checkpointId != 0) && p->checkpointFailed + // add failed checkpoint info + { + STaskStatusEntry entry = {0}; + entry.nodeId = 5; + entry.stage = 1; + + entry.id.taskId = 5; + entry.id.streamId = 999; + + entry.checkpointId = 1; + entry.checkpointFailed = true; + + taosArrayPush(msg.pTaskStatus, &entry); + } + + int32_t tlen = 0; + int32_t code = 0; + SEncoder encoder; + void* buf = NULL; + SRpcMsg msg1 = {0}; + msg1.info.noResp = 1; + + tEncodeSize(tEncodeStreamHbMsg, &msg, tlen, code); + if (code < 0) { + goto _end; + } + + buf = rpcMallocCont(tlen); + if (buf == NULL) { + goto _end; + } + + tEncoderInit(&encoder, (uint8_t*)buf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) { + rpcFreeCont(buf); + goto _end; + } + tEncoderClear(&encoder); + + initRpcMsg(&msg1, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + + taosArrayDestroy(msg.pTaskStatus); + return msg1; + +_end: + return msg1; +} + +void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskId) { + SStreamExecInfo* pExecNode = &execInfo; + + pTask->id.streamId = streamId; + pTask->id.taskId = taskId; + pTask->info.nodeId = nodeId; + + STaskId id; + id.streamId = pTask->id.streamId; + id.taskId = pTask->id.taskId; + + STaskStatusEntry entry; + streamTaskStatusInit(&entry, pTask); + + entry.stage = 1; + entry.status = TASK_STATUS__READY; + + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + taosArrayPush(pExecNode->pTaskList, &id); +} +void initStreamExecInfo() { + SStreamExecInfo* pExecNode = &execInfo; + + SStreamTask task = {0}; + setTask(&task, 1, 999, 1); + setTask(&task, 1, 999, 2); + setTask(&task, 1, 999, 3); + setTask(&task, 1, 999, 4); + setTask(&task, 2, 999, 5); +} + +void initNodeInfo() { + execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); + SNodeEntry entry = {0}; + entry.nodeId = 2; + entry.stageUpdated = true; + taosArrayPush(execInfo.pNodeList, &entry); +} +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(mndHbTest, handle_error_in_hb) { + mndInitExecInfo(); + initStreamExecInfo(); + initNodeInfo(); + + SRpcMsg msg = buildHbReq(); + int32_t code = mndProcessStreamHb(&msg); + + rpcFreeCont(msg.pCont); +} + +#pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 97cf0ffebc..3c334be2f2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -90,6 +90,8 @@ int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); +int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); + void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad); @@ -180,7 +182,6 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); -int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); @@ -233,6 +234,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); +int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished); // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cded4ddd7c..475a26aff5 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -97,7 +97,6 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - int64_t walLogLastVer; SRWLatch lock; SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle @@ -153,14 +152,14 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tq util -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); +int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); void tqUpdateNodeStage(STQ* pTq, bool isLeader); -int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData, const char* id); +int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData, const char* id); int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9d8d5013fa..cac3be9ee3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -279,6 +279,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); +int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbMerge.c ============================================================================================== typedef struct { @@ -970,8 +971,6 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { return pIter->pRow; } -int32_t tRowInfoCmprFn(const void *p1, const void *p2); - typedef struct { int64_t suid; int64_t uid; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 138bcbb133..621651507e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1554,7 +1554,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA } _resume_delete: version = RSMA_EXEC_MSG_VER(msg); - if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, + if ((terrno = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, &packData.pDataBlock, 1))) { taosFreeQitem(msg); goto _err; diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index f537ede8c1..767ea47e21 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -203,7 +203,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); if (index == NULL) { // no data yet, append it - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -213,7 +213,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); if (code != TSDB_CODE_SUCCESS) { continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8689c30a55..bde6889ecd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -66,7 +66,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->path = taosStrdup(path); pTq->pVnode = pVnode; - pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle); @@ -1055,7 +1054,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); // let's continue scan data in the wal files - if(code == 0 && pReq->reqType >= 0){ + if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { tqScanWalAsync(pTq, false); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 383a636f71..8392f4c479 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -344,7 +344,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); int32_t len = pCont->bodyLen - sizeof(SMsgHead); - code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0); + code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0); if (code == TSDB_CODE_SUCCESS) { if (*pItem == NULL) { tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7fcb86d84a..7050870c57 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -746,7 +746,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat return TDB_CODE_SUCCESS; } -int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, +int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, SSubmitTbData* pTableData, const char* id) { int32_t numOfRows = pDataBlock->info.rows; @@ -821,7 +821,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -868,7 +868,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -878,7 +878,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d18455d221..b9f578a74b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -388,7 +388,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { +int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -449,3 +449,73 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** return TSDB_CODE_SUCCESS; } + +int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) { + SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + int32_t code = TSDB_CODE_SUCCESS; + + if (pDelay != NULL) { + *pDelay = 0; + } + + *fhFinished = false; + + if (numOfTasks <= 0) { + return code; + } + + // extract the required source task for a given stream, identified by streamId + for (int32_t i = 0; i < numOfTasks; ++i) { + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + if (pId->streamId != streamId) { + continue; + } + + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + if (ppTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); + continue; + } + + if ((*ppTask)->info.taskLevel != TASK_LEVEL__SOURCE) { + continue; + } + + // here we get the required stream source task + SStreamTask* pTask = *ppTask; + *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask); + + int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + + SVersionRange verRange = {0}; + walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer); + + SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0); + if (pReader == NULL) { + tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId); + continue; + } + + int64_t cur = 0; + int64_t latest = 0; + + code = walFetchHead(pReader, ver); + if (code != TSDB_CODE_SUCCESS) { + cur = pReader->pHead->head.ingestTs; + } + + code = walFetchHead(pReader, verRange.maxVer); + if (code != TSDB_CODE_SUCCESS) { + latest = pReader->pHead->head.ingestTs; + } + + if (pDelay != NULL) { // delay in ms + *pDelay = (latest - cur) / 1000; + } + + walCloseReader(pReader); + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9d158668d2..86f58717e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2628,6 +2628,58 @@ static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) { return (pStatus->pProcMemTableIter != NULL); } +static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) { + SReaderStatus* pStatus = &pReader->status; + SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; + + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + + SDataBlockInfo* pInfo = &pResBlock->info; + blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); + + pInfo->rows = pScanInfo->numOfRowsInStt; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 1; + pInfo->window = pScanInfo->sttWindow; + + setComposedBlockFlag(pReader, true); + + pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; + pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; + pScanInfo->sttBlockReturned = true; + + pSttBlockReader->mergeTree.pIter = NULL; + + tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", + pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pResBlock->info.rows, pReader->idStr); +} + +static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, + SFileDataBlockInfo* pBlockInfo, int32_t blockIndex) { + // whole block is required, return it directly + SReaderStatus* pStatus = &pReader->status; + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + + pInfo->rows = pBlockInfo->numRow; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 0; + pInfo->version = pReader->info.verRange.maxVer; + pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; + setComposedBlockFlag(pReader, false); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); + + // update the last key for the corresponding table + pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey; + tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " + "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", + pReader, pScanInfo->uid, blockIndex, pBlockInfo->tbBlockIdx, pBlockInfo->numRow, pBlockInfo->firstKey, + pBlockInfo->lastKey, pReader->idStr); +} + static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; @@ -2680,28 +2732,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { // if only require the total rows, no need to load data from stt file if it is clean stt blocks if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) { - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - - SDataBlockInfo* pInfo = &pResBlock->info; - blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); - - pInfo->rows = pScanInfo->numOfRowsInStt; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 1; - pInfo->window = pScanInfo->sttWindow; - - setComposedBlockFlag(pReader, true); - - pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; - pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; - pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; - pScanInfo->sttBlockReturned = true; - - pSttBlockReader->mergeTree.pIter = NULL; - - tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", - pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, - pResBlock->info.rows, pReader->idStr); + buildCleanBlockFromSttFiles(pReader, pScanInfo); return TSDB_CODE_SUCCESS; } @@ -2741,10 +2772,11 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { } } -static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { +// current active data block not overlap with the stt-files/stt-blocks +static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); - if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) { + if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) { return true; } else { int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; @@ -2794,24 +2826,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) { - // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; - pInfo->rows = pBlockInfo->numRow; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 0; - pInfo->version = pReader->info.verRange.maxVer; - pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; - setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); + if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) { + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; - // update the last key for the corresponding table - pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " - "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", - pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->numRow, - pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); + if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) || + (!asc && pBlockInfo->firstKey > keyInStt)) { + if (pScanInfo->cleanSttBlocks && hasDataInSttBlock(pScanInfo)) { + if (asc) { // file block is located before the stt block + ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey); + } else { // stt block is before the file block + ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey); + } + } + + buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index); + } else { // clean stt block + if (asc) { + ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey); + } else { + ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey); + } + + // return the stt file block + ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL); + buildCleanBlockFromSttFiles(pReader, pScanInfo); + return TSDB_CODE_SUCCESS; + } } else { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); @@ -2822,7 +2862,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files - pScanInfo->cleanSttBlocks = false; initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in stt block, no need to proceed. @@ -2840,8 +2879,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // data in stt now overlaps with current active file data block, need to composed with file data block. int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader); - if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || - (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) { + if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) { tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); break; @@ -4995,9 +5033,9 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { return rows; } -int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { +int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; - metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, pMeta, 0); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -5027,7 +5065,7 @@ int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_ metaReaderClear(&mr); // get the newest table schema version - code = metaGetTbTSchemaEx(((SVnode*)pVnode)->pMeta, *suid, uid, -1, pSchema); + code = metaGetTbTSchemaEx(pMeta, *suid, uid, -1, pSchema); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index c323a81093..2392716bbf 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -91,7 +91,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getTableTypeByName = metaGetTableTypeByName; pMeta->getTableNameByUid = metaGetTableNameByUid; - pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor + pMeta->getTableSchema = vnodeGetTableSchema; pMeta->storeGetTableList = vnodeGetTableList; pMeta->getCachedTableList = metaGetCachedTableUidList; @@ -135,7 +135,9 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; pTq->tqGetResultBlockTime = tqGetResultBlockTime; -} + + pTq->tqGetStreamExecProgress = tqGetStreamExecInfo; + } void initStateStoreAPI(SStateStore* pStore) { pStore->streamFileStateInit = streamFileStateInit; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index b6a9360afd..4fc7a88494 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -14,6 +14,7 @@ */ #include "vnd.h" +#include "tsdb.h" #define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ do { \ @@ -703,3 +704,7 @@ void *vnodeGetIvtIdx(void *pVnode) { } return metaGetIvtIdx(((SVnode *)pVnode)->pMeta); } + +int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) { + return tsdbGetTableSchema(((SVnode*)pVnode)->pMeta, uid, pSchema, suid); +} diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 3cfd0ab582..2cba6e3241 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -220,7 +220,6 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi (*pResult)->win = *win; - clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -262,6 +261,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p } else if (pInfo->groupId != gid) { // this is a new group, reset the info pInfo->inWindow = false; + pInfo->groupId = gid; } SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; @@ -319,6 +319,9 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid); pInfo->inWindow = true; startIndex = rowIndex; + if (pInfo->pRow != NULL) { + clearResultRowInitFlag(pSup->pCtx, pSup->numOfExprs); + } break; } } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 8aca76597b..4a2fe2416f 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -349,6 +349,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); @@ -725,7 +726,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys } if (pInfo->isHistoryOp) { - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pAllUpdated = tSimpleHashInit(64, hashFn); } else { pInfo->pAllUpdated = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 02f8b90864..f26ff7156b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2083,6 +2083,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); @@ -2286,6 +2287,10 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { SResultWindowInfo* pWinInfo = pIte; + if (!pWinInfo->pStatePos->beUpdated) { + continue; + } + pWinInfo->pStatePos->beUpdated = false; saveResult(*pWinInfo, pStUpdated); } return TSDB_CODE_SUCCESS; @@ -3425,6 +3430,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index de667b2f20..960796d345 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4446,7 +4446,7 @@ static int32_t findVgroupsFromEqualTbname(STranslateContext* pCxt, SEqCondTbName SName snameTb; char* tbName = taosArrayGetP(pInfo->aTbnames, j); toName(pCxt->pParseCxt->acctId, dbName, tbName, &snameTb); - SVgroupInfo vgInfo; + SVgroupInfo vgInfo = {0}; bool bExists; int32_t code = catalogGetCachedTableHashVgroup(pCxt->pParseCxt->pCatalog, &snameTb, &vgInfo, &bExists); if (code == TSDB_CODE_SUCCESS && bExists) { @@ -8137,27 +8137,27 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias)); tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName)); - SNodeList* pParamterList = nodesMakeList(); - if (NULL == pParamterList) { + SNodeList* pParameterList = nodesMakeList(); + if (NULL == pParameterList) { nodesDestroyNode((SNode*)col); return TSDB_CODE_OUT_OF_MEMORY; } - int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col); + int32_t code = nodesListStrictAppend(pParameterList, (SNode*)col); if (code) { - nodesDestroyList(pParamterList); + nodesDestroyList(pParameterList); return code; } - SNode* pFunc = (SNode*)createFunction("last", pParamterList); + SNode* pFunc = (SNode*)createFunction("last", pParameterList); if (NULL == pFunc) { - nodesDestroyList(pParamterList); + nodesDestroyList(pParameterList); return TSDB_CODE_OUT_OF_MEMORY; } SNodeList* pProjectionList = nodesMakeList(); if (NULL == pProjectionList) { - nodesDestroyList(pParamterList); + nodesDestroyNode(pFunc); return TSDB_CODE_OUT_OF_MEMORY; } @@ -8169,7 +8169,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta SFunctionNode* pFunc1 = createFunction("_vgid", NULL); if (NULL == pFunc1) { - nodesDestroyList(pParamterList); + nodesDestroyList(pProjectionList); return TSDB_CODE_OUT_OF_MEMORY; } @@ -8182,7 +8182,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta SFunctionNode* pFunc2 = createFunction("_vgver", NULL); if (NULL == pFunc2) { - nodesDestroyList(pParamterList); + nodesDestroyList(pProjectionList); return TSDB_CODE_OUT_OF_MEMORY; } @@ -8199,25 +8199,54 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta return code; } - // todo add the group by statement SSelectStmt** pSelect1 = (SSelectStmt**)pQuery; (*pSelect1)->pGroupByList = nodesMakeList(); + if (NULL == (*pSelect1)->pGroupByList) { + return TSDB_CODE_OUT_OF_MEMORY; + } SGroupingSetNode* pNode1 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == pNode1) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pNode1->groupingSetType = GP_TYPE_NORMAL; pNode1->pParameterList = nodesMakeList(); - nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1); + if (NULL == pNode1->pParameterList) { + nodesDestroyNode((SNode*)pNode1); + return TSDB_CODE_OUT_OF_MEMORY; + } - nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1); + code = nodesListStrictAppend(pNode1->pParameterList, nodesCloneNode((SNode*)pFunc1)); + if (code) { + nodesDestroyNode((SNode*)pNode1); + return code; + } + + code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1); + if (code) { + return code; + } SGroupingSetNode* pNode2 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == pNode2) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pNode2->groupingSetType = GP_TYPE_NORMAL; pNode2->pParameterList = nodesMakeList(); - nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2); + if (NULL == pNode2->pParameterList) { + nodesDestroyNode((SNode*)pNode2); + return TSDB_CODE_OUT_OF_MEMORY; + } - nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2); + code = nodesListStrictAppend(pNode2->pParameterList, nodesCloneNode((SNode*)pFunc2)); + if (code) { + nodesDestroyNode((SNode*)pNode2); + return code; + } - return code; + return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2); } static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index f1b43c535d..1cff7ac87a 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -755,6 +755,7 @@ void MockCatalogService::destoryCatalogReq(SCatalogReq* pReq) { taosArrayDestroy(pReq->pUser); taosArrayDestroy(pReq->pTableIndex); taosArrayDestroy(pReq->pTableCfg); + taosArrayDestroyEx(pReq->pView, destoryTablesReq); delete pReq; } @@ -781,6 +782,7 @@ void MockCatalogService::destoryMetaData(SMetaData* pData) { taosArrayDestroyEx(pData->pQnodeList, destoryMetaRes); taosArrayDestroyEx(pData->pTableCfg, destoryMetaRes); taosArrayDestroyEx(pData->pDnodeList, destoryMetaArrayRes); + taosArrayDestroyEx(pData->pView, destoryMetaRes); taosMemoryFree(pData->pSvrVer); delete pData; } diff --git a/source/libs/parser/test/parAlterToBalanceTest.cpp b/source/libs/parser/test/parAlterToBalanceTest.cpp index d137029a14..1cf132a632 100644 --- a/source/libs/parser/test/parAlterToBalanceTest.cpp +++ b/source/libs/parser/test/parAlterToBalanceTest.cpp @@ -73,6 +73,7 @@ TEST_F(ParserInitialATest, alterDnode) { ASSERT_EQ(req.dnodeId, expect.dnodeId); ASSERT_EQ(std::string(req.config), std::string(expect.config)); ASSERT_EQ(std::string(req.value), std::string(expect.value)); + tFreeSMCfgDnodeReq(&req); }); setCfgDnodeReq(1, "resetLog"); @@ -183,6 +184,7 @@ TEST_F(ParserInitialATest, alterDatabase) { ASSERT_EQ(req.minRows, expect.minRows); ASSERT_EQ(req.walRetentionPeriod, expect.walRetentionPeriod); ASSERT_EQ(req.walRetentionSize, expect.walRetentionSize); + tFreeSAlterDbReq(&req); }); const int32_t MINUTE_PER_DAY = MILLISECOND_PER_DAY / MILLISECOND_PER_MINUTE; @@ -827,6 +829,7 @@ TEST_F(ParserInitialATest, alterUser) { ASSERT_EQ(std::string(req.user), std::string(expect.user)); ASSERT_EQ(std::string(req.pass), std::string(expect.pass)); ASSERT_EQ(std::string(req.objname), std::string(expect.objname)); + tFreeSAlterUserReq(&req); }); setAlterUserReq("wxy", TSDB_ALTER_USER_PASSWD, "123456"); @@ -853,6 +856,7 @@ TEST_F(ParserInitialATest, balanceVgroup) { ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_BALANCE_VGROUP); SBalanceVgroupReq req = {0}; ASSERT_EQ(tDeserializeSBalanceVgroupReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS); + tFreeSBalanceVgroupReq(&req); }); run("BALANCE VGROUP"); @@ -870,6 +874,7 @@ TEST_F(ParserInitialATest, balanceVgroupLeader) { SBalanceVgroupLeaderReq req = {0}; ASSERT_EQ(tDeserializeSBalanceVgroupLeaderReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS); + tFreeSBalanceVgroupLeaderReq(&req); }); run("BALANCE VGROUP LEADER"); diff --git a/source/libs/parser/test/parExplainToSyncdbTest.cpp b/source/libs/parser/test/parExplainToSyncdbTest.cpp index fbb9ce227c..2f3c7e6f92 100644 --- a/source/libs/parser/test/parExplainToSyncdbTest.cpp +++ b/source/libs/parser/test/parExplainToSyncdbTest.cpp @@ -52,6 +52,7 @@ TEST_F(ParserExplainToSyncdbTest, grant) { ASSERT_EQ(req.alterType, expect.alterType); ASSERT_EQ(string(req.user), string(expect.user)); ASSERT_EQ(string(req.objname), string(expect.objname)); + tFreeSAlterUserReq(&req); }); setAlterUserReq(TSDB_ALTER_USER_ADD_PRIVILEGES, PRIVILEGE_TYPE_ALL, "wxy", "0.*"); @@ -183,6 +184,7 @@ TEST_F(ParserExplainToSyncdbTest, redistributeVgroup) { ASSERT_EQ(req.dnodeId1, expect.dnodeId1); ASSERT_EQ(req.dnodeId2, expect.dnodeId2); ASSERT_EQ(req.dnodeId3, expect.dnodeId3); + tFreeSRedistributeVgroupReq(&req); }); setRedistributeVgroupReqFunc(3, 1); @@ -228,6 +230,7 @@ TEST_F(ParserExplainToSyncdbTest, restoreDnode) { ASSERT_EQ(tDeserializeSRestoreDnodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS); ASSERT_EQ(req.dnodeId, expect.dnodeId); ASSERT_EQ(req.restoreType, expect.restoreType); + tFreeSRestoreDnodeReq(&req); }); setRestoreDnodeReq(1, RESTORE_TYPE__ALL); @@ -272,6 +275,7 @@ TEST_F(ParserExplainToSyncdbTest, revoke) { ASSERT_EQ(req.alterType, expect.alterType); ASSERT_EQ(string(req.user), string(expect.user)); ASSERT_EQ(string(req.objname), string(expect.objname)); + tFreeSAlterUserReq(&req); }); setAlterUserReq(TSDB_ALTER_USER_DEL_PRIVILEGES, PRIVILEGE_TYPE_ALL, "wxy", "0.*"); diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 0f6aa22050..1a3559316c 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -43,6 +43,7 @@ TEST_F(ParserInitialCTest, compact) { ASSERT_EQ(std::string(req.db), std::string(expect.db)); ASSERT_EQ(req.timeRange.skey, expect.timeRange.skey); ASSERT_EQ(req.timeRange.ekey, expect.timeRange.ekey); + tFreeSCompactDbReq(&req); }); setCompactDbReq("test"); @@ -374,6 +375,7 @@ TEST_F(ParserInitialCTest, createDnode) { ASSERT_EQ(std::string(req.fqdn), std::string(expect.fqdn)); ASSERT_EQ(req.port, expect.port); + tFreeSCreateDnodeReq(&req); }); setCreateDnodeReq("abc1", 7030); @@ -599,6 +601,7 @@ TEST_F(ParserInitialCTest, createMnode) { tDeserializeSCreateDropMQSNodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_EQ(req.dnodeId, expect.dnodeId); + tFreeSMCreateQnodeReq(&req); }); setCreateMnodeReq(1); @@ -622,6 +625,7 @@ TEST_F(ParserInitialCTest, createQnode) { tDeserializeSCreateDropMQSNodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_EQ(req.dnodeId, expect.dnodeId); + tFreeSMCreateQnodeReq(&req); }); setCreateQnodeReq(1); @@ -1326,6 +1330,7 @@ TEST_F(ParserInitialCTest, createUser) { ASSERT_EQ(req.enable, expect.enable); ASSERT_EQ(std::string(req.user), std::string(expect.user)); ASSERT_EQ(std::string(req.pass), std::string(expect.pass)); + tFreeSCreateUserReq(&req); }); setCreateUserReq("wxy", "123456"); diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index 937f76176e..1a724e7b70 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -117,6 +117,7 @@ TEST_F(ParserInitialDTest, dropDnode) { ASSERT_EQ(req.port, expect.port); ASSERT_EQ(req.force, expect.force); ASSERT_EQ(req.unsafe, expect.unsafe); + tFreeSDropDnodeReq(&req); }); setDropDnodeReqById(1); @@ -208,6 +209,7 @@ TEST_F(ParserInitialDTest, dropQnode) { tDeserializeSCreateDropMQSNodeReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_EQ(req.dnodeId, expect.dnodeId); + tFreeSDDropQnodeReq(&req); }); setDropQnodeReq(1); @@ -245,6 +247,7 @@ TEST_F(ParserInitialDTest, dropStream) { ASSERT_EQ(std::string(req.name), std::string(expect.name)); ASSERT_EQ(req.igNotExists, expect.igNotExists); + tFreeMDropStreamReq(&req); }); setDropStreamReq("s1"); @@ -285,6 +288,7 @@ TEST_F(ParserInitialDTest, dropUser) { ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSDropUserReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_EQ(std::string(req.user), std::string(expect.user)); + tFreeSDropUserReq(&req); }); setDropUserReq("wxy"); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index cda1614612..9cb67dc968 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2977,6 +2977,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic } nodesClearList(cxt.pLastCols); } + nodesClearList(cxt.pOtherCols); pAgg->hasLastRow = false; pAgg->hasLast = false; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5246eab115..594bb205d8 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -737,6 +737,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } +#if 0 + SReadHandle* pReadHandle = qwMsg->node; + int64_t delay = 0; + bool fhFinish = false; + pReadHandle->api.tqReaderFn.tqGetStreamExecProgress(pReadHandle->vnode, 0, &delay, &fhFinish); +#endif + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); sql = NULL; if (code) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f45904f036..b1783fb640 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -158,7 +158,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->chkInfo.transId = pReq->transId; pTask->chkInfo.checkpointingId = pReq->checkpointId; - pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -214,7 +214,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1); + atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1); streamProcessCheckpointReadyMsg(pTask); streamFreeQitem((SStreamQueueItem*)pBlock); } @@ -249,7 +249,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure - pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks @@ -268,7 +268,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task - int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1); + int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.numOfNotReady, 1); ASSERT(notReady >= 0); if (notReady == 0) { @@ -287,7 +287,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id pTask->chkInfo.failedId = 0; pTask->chkInfo.startTs = 0; // clear the recorded start time - pTask->chkInfo.checkpointNotReadyTasks = 0; + pTask->chkInfo.numOfNotReady = 0; pTask->chkInfo.transId = 0; pTask->chkInfo.dispatchCheckpointTrigger = false; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 78929c365e..0936d410bf 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -155,14 +155,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = 0; // no available token in bucket for sink task, let's wait for a little bit - if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); return TSDB_CODE_SUCCESS; } while (1) { if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { - stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); + stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 3854e90901..d491b00e73 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -70,10 +70,9 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64, + wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - if (fetchVer > appliedVer){ + if (fetchVer > appliedVer) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } @@ -86,10 +85,8 @@ int32_t walNextValidMsg(SWalReader *pReader) { int32_t type = pReader->pHead->head.msgType; if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { - if (walFetchBody(pReader) < 0) { - return -1; - } - return 0; + int32_t code = walFetchBody(pReader); + return (code == TSDB_CODE_SUCCESS)? 0:-1; } else { if (walSkipFetchBody(pReader) < 0) { return -1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 341d989f8f..9783705bad 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -498,7 +498,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; - pWal->writeHead.head.ingestTs = 0; + pWal->writeHead.head.ingestTs = taosGetTimestampUs(); // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta; diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index dda4b14901..7d1cc746ff 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -56,6 +56,8 @@ int32_t taosGetAppName(char* name, int32_t* len) { char* end = strrchr(filepath, TD_DIRSEP[0]); if (end == NULL) { end = filepath; + } else { + end += 1; } tstrncpy(name, end, TSDB_APP_NAME_LEN); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 3cc00ddc7f..656e2706f2 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -977,6 +977,7 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co } #endif +#ifdef BUILD_NO_CALL /************************************************************************* * STREAM COMPRESSION *************************************************************************/ @@ -2120,7 +2121,7 @@ int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) { return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData); } - +#endif /************************************************************************* * REGULAR COMPRESSION *************************************************************************/ diff --git a/tests/army/community/cluster/snapshot.json b/tests/army/community/cluster/snapshot.json index d4f6f00d37..4855c23260 100644 --- a/tests/army/community/cluster/snapshot.json +++ b/tests/army/community/cluster/snapshot.json @@ -35,8 +35,8 @@ "start_timestamp":"now-12d", "columns": [ { "type": "bool", "name": "bc"}, - { "type": "float", "name": "fc" }, - { "type": "double", "name": "dc"}, + { "type": "float", "name": "fc", "min": 100, "max": 100}, + { "type": "double", "name": "dc", "min": 200, "max": 200}, { "type": "tinyint", "name": "ti"}, { "type": "smallint", "name": "si" }, { "type": "int", "name": "ic" }, diff --git a/tests/army/community/cluster/snapshot.py b/tests/army/community/cluster/snapshot.py index 5b5457be75..b4c4d3c4c8 100644 --- a/tests/army/community/cluster/snapshot.py +++ b/tests/army/community/cluster/snapshot.py @@ -29,7 +29,11 @@ from frame import * class TDTestCase(TBase): updatecfgDict = { - "countAlwaysReturnValue" : "0" + "countAlwaysReturnValue" : "0", + "lossyColumns" : "float,double", + "fPrecision" : "0.000000001", + "dPrecision" : "0.00000000000000001", + "ifAdtFse" : "1" } def insertData(self): @@ -48,6 +52,18 @@ class TDTestCase(TBase): sql = f"create table {self.db}.ta(ts timestamp, age int) tags(area int)" tdSql.execute(sql) + def checkFloatDouble(self): + sql = f"select * from {self.db}.{self.stb} where fc!=100" + tdSql.query(sql) + tdSql.checkRows(0) + sql = f"select count(*) from {self.db}.{self.stb} where dc!=200" + tdSql.query(sql) + tdSql.checkRows(0) + sql = f"select avg(fc) from {self.db}.{self.stb}" + tdSql.checkFirstValue(sql, 100) + sql = f"select avg(dc) from {self.db}.{self.stb}" + tdSql.checkFirstValue(sql, 200) + def doAction(self): tdLog.info(f"do action.") self.flushDb() @@ -85,6 +101,9 @@ class TDTestCase(TBase): # check insert data correct self.checkInsertCorrect() + # check float double value ok + self.checkFloatDouble() + # save self.snapshotAgg() @@ -97,6 +116,10 @@ class TDTestCase(TBase): # check insert correct again self.checkInsertCorrect() + # check float double value ok + self.checkFloatDouble() + + tdLog.success(f"{__file__} successfully executed") diff --git a/tests/army/community/storage/oneStageComp.json b/tests/army/community/storage/oneStageComp.json new file mode 100644 index 0000000000..f64fda3824 --- /dev/null +++ b/tests/army/community/storage/oneStageComp.json @@ -0,0 +1,66 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 3000, + "prepared_rand": 3000, + "thread_count": 2, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "yes", + "vgroups": 2, + "replica": 3, + "wal_retention_period": 10, + "wal_retention_size": 100, + "keep": "60d,120d,365d", + "stt_trigger": 1, + "wal_level": 2, + "WAL_FSYNC_PERIOD": 3300, + "cachemodel": "'last_value'", + "TABLE_PREFIX":1, + "comp": 1 + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "no", + "childtable_count": 10, + "insert_rows": 100000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 1000, + "start_timestamp":"now-360d", + "columns": [ + { "type": "bool", "name": "bc","max": 1,"min": 1}, + { "type": "float", "name": "fc" ,"max": 101,"min": 101}, + { "type": "double", "name": "dc" ,"max": 102,"min": 102}, + { "type": "tinyint", "name": "ti" ,"max": 103,"min": 103}, + { "type": "smallint", "name": "si" ,"max": 104,"min": 104}, + { "type": "int", "name": "ic" ,"max": 105,"min": 105}, + { "type": "bigint", "name": "bi" ,"max": 106,"min": 106}, + { "type": "utinyint", "name": "uti","max": 107,"min": 107}, + { "type": "usmallint", "name": "usi","max": 108,"min": 108}, + { "type": "uint", "name": "ui" ,"max": 109,"min": 109}, + { "type": "ubigint", "name": "ubi","max": 110,"min": 110}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 32} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 100,"min": 100}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/community/storage/oneStageComp.py b/tests/army/community/storage/oneStageComp.py new file mode 100644 index 0000000000..9a2c7cfcd6 --- /dev/null +++ b/tests/army/community/storage/oneStageComp.py @@ -0,0 +1,140 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool + + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * + + +class TDTestCase(TBase): + updatecfgDict = { + "compressMsgSize" : "100", + } + + def insertData(self): + tdLog.info(f"insert data.") + # taosBenchmark run + jfile = etool.curFile(__file__, "oneStageComp.json") + etool.benchMark(json=jfile) + + tdSql.execute(f"use {self.db}") + # set insert data information + self.childtable_count = 10 + self.insert_rows = 100000 + self.timestamp_step = 1000 + + + + def checkColValueCorrect(self): + tdLog.info(f"do action.") + self.flushDb() + + # check all columns correct + cnt = self.insert_rows * self.childtable_count + sql = "select * from stb where bc!=1" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where fc=101" + tdSql.query(sql) + tdSql.checkRows(cnt) + sql = "select * from stb where dc!=102" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ti!=103" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where si!=104" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ic!=105" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where bi!=106" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where uti!=107" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where usi!=108" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ui!=109" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ubi!=110" + tdSql.query(sql) + tdSql.checkRows(0) + + def insertNull(self): + # insert 6 lines + sql = "insert into d0(ts) values(now) (now + 1s) (now + 2s) (now + 3s) (now + 4s) (now + 5s)" + tdSql.execute(sql) + + self.flushDb() + self.trimDb() + + # check all columns correct + cnt = self.insert_rows * self.childtable_count + sql = "select * from stb where bc!=1" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where bc is null" + tdSql.query(sql) + tdSql.checkRows(6) + sql = "select * from stb where bc=1" + tdSql.query(sql) + tdSql.checkRows(cnt) + sql = "select * from stb where usi is null" + tdSql.query(sql) + tdSql.checkRows(6) + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + + # insert data + self.insertData() + + # check insert data correct + self.checkInsertCorrect() + + # save + self.snapshotAgg() + + # do action + self.checkColValueCorrect() + + # check save agg result correct + self.checkAggCorrect() + + # insert null + self.insertNull() + + + tdLog.success(f"{__file__} successfully executed") + + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 91a0ac46e5..103e67be46 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -23,7 +23,7 @@ fi ,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3 ,,n,army,python3 ./test.py -f community/cmdline/fullopt.py - +,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1 # @@ -350,6 +350,7 @@ fi ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py @@ -567,7 +568,7 @@ fi ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4403.py - +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_td28163.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh index 9f67d437e2..d2f1e13e8f 100755 --- a/tests/script/sh/checkAsan.sh +++ b/tests/script/sh/checkAsan.sh @@ -72,7 +72,7 @@ python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l) #0 0x7f2d64f5a808 in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cc:144 #1 0x7f2d63fcf459 in strerror /build/glibc-SzIz7B/glibc-2.31/string/strerror.c:38 -runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | wc -l) +runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cpp" | grep -v "sclvector.c" | wc -l) echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m" echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m" diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 67678963ea..ce5c57572e 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -290,6 +290,213 @@ if $data32 != $now32 then return -1 endi +print step 2 max delay 2s +sql create database test15 vgroups 4; +sql use test15; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791233001,2,2,3,1.1); + +$loop_count = 0 + +loop4: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop4 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print session max delay over + +print step 3 max delay 2s +sql create database test16 vgroups 4; +sql use test16; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791233001,2,2,3,1.1); + +$loop_count = 0 + +loop5: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop5 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print state max delay over + +print step 4 max delay 2s +sql create database test17 vgroups 4; +sql use test17; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9; + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,9,2,3,1.0); + +sql insert into t1 values(1648791233001,1,2,3,1.1); +sql insert into t1 values(1648791233009,9,2,3,1.1); + +$loop_count = 0 + +loop6: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop6 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print event max delay over + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/1-insert/test_ts4479.py b/tests/system-test/1-insert/test_ts4479.py new file mode 100644 index 0000000000..be9789b5fc --- /dev/null +++ b/tests/system-test/1-insert/test_ts4479.py @@ -0,0 +1,75 @@ +import os +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf +import taos + + +class TDTestCase: + """Verify inserting varbinary type data of ts-4479 + """ + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + self.db_name = "db" + self.stable_name = "st" + + def run(self): + tdSql.execute("create database if not exists %s" % self.db_name) + tdSql.execute("use %s" % self.db_name) + # create super table + tdSql.execute("create table %s (ts timestamp, c1 varbinary(65517)) tags (t1 varbinary(16382))" % self.stable_name) + + # varbinary tag length is more than 16382 + tag = os.urandom(16383).hex() + tdSql.error("create table ct using st tags(%s);" % ('\\x' + tag)) + + # create child table with max column and tag length + child_table_list = [] + for i in range(2): + child_table_name = "ct_" + str(i+1) + child_table_list.append(child_table_name) + tag = os.urandom(16382).hex() + tdSql.execute("create table %s using st tags('%s');" % (child_table_name, '\\x' + tag)) + tdLog.info("create table %s successfully" % child_table_name) + + # varbinary column length is more than 65517 + value = os.urandom(65518).hex() + tdSql.error("insert into ct_1 values(now, '\\x%s');" % value) + + # insert data + for i in range(10): + sql = "insert into table_name values" + for j in range(5): + value = os.urandom(65517).hex() + sql += "(now+%ss, '%s')," % (str(j+1), '\\x' + value) + for child_table in child_table_list: + tdSql.execute(sql.replace("table_name", child_table)) + tdLog.info("Insert data into %s successfully" % child_table) + tdLog.info("Insert data round %s successfully" % str(i+1)) + tdSql.execute("flush database %s" % self.db_name) + + # insert \\x to varbinary column + tdSql.execute("insert into ct_1 values(now, '\\x');") + tdSql.query("select * from ct_1 where c1 = '\\x';") + tdSql.checkRows(1) + tdSql.checkData(0, 1, b'') + + # insert \\x to varbinary tag + tdSql.execute("create table ct_3 using st tags('\\x');") + tdSql.execute("insert into ct_3 values(now, '\\x45');") + tdSql.query("select * from st where t1='';") + tdSql.checkRows(1) + tdSql.checkData(0, 2, b'') + + def stop(self): + tdSql.execute("drop database if exists %s" % self.db_name) + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/mergeFileSttQuery.py b/tests/system-test/2-query/mergeFileSttQuery.py index 7d2695a760..4f15fe60f1 100644 --- a/tests/system-test/2-query/mergeFileSttQuery.py +++ b/tests/system-test/2-query/mergeFileSttQuery.py @@ -42,18 +42,38 @@ class TDTestCase: def run(self): binPath = self.getPath() - tdLog.debug("insert full data block and flush db") + tdLog.debug("insert full data block that has first time '2021-10-02 00:00:00.001' and flush db") os.system(f"{binPath} -f ./2-query/megeFileSttQuery.json") tdSql.execute("flush database db;") - tdLog.debug("insert disorder data and flush db") + + tdLog.debug("insert only a piece of data that is behind the time that already exists and flush db") + tdSql.execute("insert into db.d0 values ('2021-10-01 23:59:59.990',12.793,208,0.84) ;") + tdSql.execute("flush database db;") + tdLog.debug("check data") + sleep(1) + tdSql.query("select count(*) from db.d0;") + tdSql.checkData(0,0,10001) + tdSql.execute("drop database db;") + + + + + + tdLog.debug("insert full data block that has first time '2021-10-02 00:00:00.001' and flush db") + os.system(f"{binPath} -f ./2-query/megeFileSttQuery.json") + tdSql.execute("flush database db;") + + tdLog.debug("insert four pieces of disorder data, and the time range covers the data file that was previously placed on disk and flush db") os.system(f"{binPath} -f ./2-query/megeFileSttQueryUpdate.json") tdSql.execute("flush database db;") tdLog.debug("check data") + tdSql.query("select count(*) from db.d0;",queryTimes=3) + tdSql.checkData(0,0,10004) tdSql.query("select ts from db.d0 limit 5;") tdSql.checkData(0, 0, '2021-10-02 00:00:00.001') tdSql.checkData(1, 0, '2021-10-02 00:01:00.000') - tdLog.debug("update disorder data and flush db") + tdLog.debug("update the same disorder data and flush db") os.system(f"{binPath} -f ./2-query/megeFileSttQueryUpdate.json") tdSql.query("select ts from db.d0 limit 5;") tdSql.checkData(0, 0, '2021-10-02 00:00:00.001') diff --git a/tests/system-test/2-query/test_td28163.py b/tests/system-test/2-query/test_td28163.py new file mode 100644 index 0000000000..df727f6c5a --- /dev/null +++ b/tests/system-test/2-query/test_td28163.py @@ -0,0 +1,265 @@ +import random +import itertools +from util.log import * +from util.cases import * +from util.sql import * +from util.sqlset import * +from util import constant +from util.common import * + + +class TDTestCase: + """Verify the jira TD-28163 + """ + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def prepareData(self): + # db + tdSql.execute("create database if not exists db") + tdSql.execute("use db") + + # super table + tdSql.execute("create stable st(ts timestamp, c_ts_empty timestamp, c_int int, c_int_empty int, c_unsigned_int int unsigned, \ + c_unsigned_int_empty int unsigned, c_bigint bigint, c_bigint_empty bigint, c_unsigned_bigint bigint unsigned, \ + c_unsigned_bigint_empty bigint unsigned, c_float float, c_float_empty float, c_double double, c_double_empty double, \ + c_binary binary(16), c_binary_empty binary(16), c_smallint smallint, c_smallint_empty smallint, \ + c_smallint_unsigned smallint unsigned, c_smallint_unsigned_empty smallint unsigned, c_tinyint tinyint, \ + c_tinyint_empty tinyint, c_tinyint_unsigned tinyint unsigned, c_tinyint_unsigned_empty tinyint unsigned, \ + c_bool bool, c_bool_empty bool, c_nchar nchar(16), c_nchar_empty nchar(16), c_varchar varchar(16), \ + c_varchar_empty varchar(16), c_varbinary varbinary(16), c_varbinary_empty varbinary(16)) \ + tags(t_timestamp timestamp, t_timestamp_empty timestamp, t_int int, t_int_empty int, \ + t_unsigned_int int unsigned, t_unsigned_int_empty int unsigned, t_bigint bigint, t_bigint_empty bigint, \ + t_unsigned_bigint bigint unsigned, t_unsigned_bigint_empty bigint unsigned, t_float float, t_float_empty float, \ + t_double double, t_double_empty double, t_binary binary(16), t_binary_empty binary(16), t_smallint smallint, \ + t_smallint_empty smallint, t_smallint_unsigned smallint unsigned, t_smallint_unsigned_empty smallint unsigned, \ + t_tinyint tinyint, t_tinyint_empty tinyint, t_tinyint_unsigned tinyint unsigned, t_tinyint_unsigned_empty tinyint unsigned, \ + t_bool bool, t_bool_empty bool, t_nchar nchar(16), t_nchar_empty nchar(16), t_varchar varchar(16), \ + t_varchar_empty varchar(16), t_varbinary varbinary(16), t_varbinary_empty varbinary(16));") + + # child tables + start_ts = 1704085200000 + tags = [ + "'2024-01-01 13:00:01', null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', '', 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null", + "'2024-01-01 13:00:02', null, 2, null, 2, null, 2222222222222222, null, 2222222222222222, null, 2.2, null, 2.22, null, 'bbbbbbbb', '', 2, null, 2, null, 2, null, 2, null, False, null, 'ncharbb', null, 'varcharbb', null, '0x7661726332', null", + "'2024-01-01 13:00:03', null, 3, null, 3, null, 3333333333333333, null, 3333333333333333, null, 3.3, null, 3.33, null, 'cccccccc', '', 3, null, 3, null, 3, null, 3, null, True, null, 'ncharcc', null, 'varcharcc', null, '0x7661726333', null", + "'2024-01-01 13:00:04', null, 4, null, 4, null, 4444444444444444, null, 4444444444444444, null, 4.4, null, 4.44, null, 'dddddddd', '', 4, null, 4, null, 4, null, 4, null, False, null, 'nchardd', null, 'varchardd', null, '0x7661726334', null", + "'2024-01-01 13:00:05', null, 5, null, 5, null, 5555555555555555, null, 5555555555555555, null, 5.5, null, 5.55, null, 'eeeeeeee', '', 5, null, 5, null, 5, null, 5, null, True, null, 'ncharee', null, 'varcharee', null, '0x7661726335', null", + ] + for i in range(5): + tdSql.execute(f"create table ct{i+1} using st tags({tags[i]});") + + # insert data + data = "null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', null, 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null" + for round in range(100): + sql = f"insert into ct{i+1} values" + for j in range(100): + sql += f"({start_ts + (round * 100 + j + 1) * 1000}, {data})" + sql += ";" + tdSql.execute(sql) + tdLog.debug("Prepare data successfully") + + def test_query_with_filter(self): + # total row number + tdSql.query("select count(*) from st;") + total_rows = tdSql.queryResult[0][0] + tdLog.debug("Total row number is %s" % total_rows) + + # start_ts and end_ts + tdSql.query("select first(ts), last(ts) from st;") + start_ts = tdSql.queryResult[0][0] + end_ts = tdSql.queryResult[0][1] + tdLog.debug("start_ts is %s, end_ts is %s" % (start_ts, end_ts)) + + filter_dic = { + "all_filter_list": ["ts <= now", "t_timestamp <= now", f"ts between '{start_ts}' and '{end_ts}'", + f"t_timestamp between '{start_ts}' and '{end_ts}'", "c_ts_empty is null", + "t_timestamp_empty is null", "ts > '1970-01-01 00:00:00'", "t_int in (1, 2, 3, 4, 5)", + "c_int=1", "c_int_empty is null", "c_unsigned_int=1", "c_unsigned_int_empty is null", + "c_unsigned_int in (1, 2, 3, 4, 5)", "c_unsigned_int_empty is null", "c_bigint=1111111111111111", + "c_bigint_empty is null", "c_unsigned_bigint in (1111111111111111)", "c_unsigned_bigint_empty is null", + "c_float=1.1", "c_float_empty is null", "c_double=1.11", "c_double_empty is null", "c_binary='aaaaaaaa'", + "c_binary_empty is null", "c_smallint=1", "c_smallint_empty is null", "c_smallint_unsigned=1", + "c_smallint_unsigned_empty is null", "c_tinyint=1", "c_tinyint_empty is null", "c_tinyint_unsigned=1", + "c_tinyint_unsigned_empty is null", "c_bool=True", "c_bool_empty is null", "c_nchar='ncharaa'", + "c_nchar_empty is null", "c_varchar='varcharaa'", "c_varchar_empty is null", "c_varbinary='0x7661726331'", + "c_varbinary_empty is null"], + "empty_filter_list": ["ts > now", "t_timestamp > now", "c_ts_empty is not null","t_timestamp_empty is not null", + "ts <= '1970-01-01 00:00:00'", "c_ts_empty < '1970-01-01 00:00:00'", "c_int <> 1", "c_int_empty is not null", + "t_int in (10, 11)", "t_int_empty is not null"] + } + for filter in filter_dic["all_filter_list"]: + tdLog.debug("Execute query with filter '%s'" % filter) + tdSql.query(f"select * from st where {filter};") + tdSql.checkRows(total_rows) + + for filter in filter_dic["empty_filter_list"]: + tdLog.debug("Execute query with filter '%s'" % filter) + tdSql.query(f"select * from st where {filter};") + tdSql.checkRows(0) + + def test_query_with_groupby(self): + tdSql.query("select count(*) from st group by tbname;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(c_unsigned_int_empty + c_int_empty * c_float_empty - c_double_empty + c_smallint_empty / c_tinyint_empty) from st where c_int_empty is null group by tbname;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, 0) + + tdSql.query("select sum(t_unsigned_int_empty + t_int_empty * t_float_empty - t_double_empty + t_smallint_empty / t_tinyint_empty) from st where t_int_empty is null group by tbname;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + + tdSql.query("select max(c_bigint_empty) from st group by tbname, t_bigint_empty, t_float_empty, t_double_empty;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + + tdSql.query("select min(t_double) as v from st where c_nchar like '%aa%' and t_double is not null group by tbname, t_bigint_empty, t_float_empty, t_double_empty order by v limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1.11) + + tdSql.query("select top(c_float, 1) as v from st where c_nchar like '%aa%' group by tbname order by v desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1.1) + + tdSql.query("select first(ts) from st where c_varchar is not null partition by tbname order by ts slimit 1;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + + tdSql.query("select first(c_nchar_empty) from st group by tbname;") + tdSql.checkRows(0) + + tdSql.query("select first(ts), first(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + tdSql.checkData(0, 1, None) + + tdSql.query("select first(c_nchar_empty) from st group by t_timestamp_empty order by t_timestamp;") + tdSql.checkRows(0) + + tdSql.query("select last(ts), last(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + tdSql.checkData(0, 1, None) + + tdSql.query("select elapsed(ts, 1s) t from st where c_int = 1 and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 9999) + + tdSql.query("select elapsed(ts, 1s) t from st where c_int_empty is not null and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;") + tdSql.checkRows(0) + + def test_query_with_join(self): + tdSql.query("select count(*) from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is not null;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + tdSql.query("select count(t1.c_ts_empty) as v from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is null order by v desc;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int > t2.c_int;") + tdSql.checkRows(0) + + tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int <= t2.c_int;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 1076616672134475760) + + tdSql.query("select count(t1.c_float_empty) from st t1, st t2 where t1.ts=t2.ts and t1.c_int = t2.c_int and t1.t_int_empty=t2.t_int_empty;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + def test_query_with_window(self): + # time window + tdSql.query("select sum(c_int_empty) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m) fill(value, 10);") + tdSql.checkRows(841) + tdSql.checkData(0, 0, 10) + + tdSql.query("select _wstart, _wend, sum(c_int) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m);") + tdSql.checkRows(65) + + # status window + tdSql.error("select _wstart, count(*) from st state_window(t_bool);") + tdSql.query("select _wstart, count(*) from st partition by tbname state_window(c_bool);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 10000) + + # session window + tdSql.query("select _wstart, count(*) from st partition by tbname, t_int session(ts, 1m);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 10000) + + # event window + tdSql.query("select _wstart, _wend, count(*) from (select * from st order by ts, tbname) event_window start with t_bool=true end with t_bool=false;") + tdSql.checkRows(20000) + + def test_query_with_union(self): + tdSql.query("select count(ts) from (select * from ct1 union select * from ct2 union select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(ts) from (select * from ct1 union all select * from ct2 union all select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 30000) + + tdSql.query("select count(*) from (select * from ct1 union select * from ct2 union select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(c_ts_empty) from (select * from ct1 union select * from ct2 union select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + tdSql.query("select count(*) from (select ts from st union select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10001) + + tdSql.query("select count(*) from (select ts from st union all select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 100000) + + tdSql.query("select count(ts) from (select ts from st union select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(ts) from (select ts from st union all select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 50000) + + def test_nested_query(self): + tdSql.query("select elapsed(ts, 1s) from (select * from (select * from st where c_int = 1) where c_int_empty is null);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 9999) + + tdSql.query("select first(ts) as t, avg(c_int) as v from (select * from (select * from st where c_int = 1) where c_int_empty is null) group by t_timestamp order by t_timestamp desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + tdSql.checkData(0, 1, 1) + + tdSql.query("select max(c_tinyint) from (select c_tinyint, tbname from st where c_float_empty is null or t_int_empty is null) group by tbname order by c_tinyint desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + + tdSql.query("select top(c_int, 3) from (select c_int, tbname from st where t_int in (2, 3)) group by tbname slimit 3;") + tdSql.checkRows(6) + tdSql.checkData(0, 0, 1) + + def run(self): + self.prepareData() + self.test_query_with_filter() + self.test_query_with_groupby() + self.test_query_with_join() + self.test_query_with_window() + self.test_query_with_union() + self.test_nested_query() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/8-stream/max_delay_session.py b/tests/system-test/8-stream/max_delay_session.py index 7e9d365fc8..934fbbcac2 100644 --- a/tests/system-test/8-stream/max_delay_session.py +++ b/tests/system-test/8-stream/max_delay_session.py @@ -58,8 +58,8 @@ class TDTestCase: tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}') else: tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}') - if not fill_history_value: - tdSql.checkEqual(tdSql.queryRows, init_num) + # if not fill_history_value: + # tdSql.checkEqual(tdSql.queryRows, init_num) self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) diff --git a/utils/TSZ/sz/src/sz_double.c b/utils/TSZ/sz/src/sz_double.c index 1adfdf3b56..0510fc612d 100644 --- a/utils/TSZ/sz/src/sz_double.c +++ b/utils/TSZ/sz/src/sz_double.c @@ -385,9 +385,11 @@ unsigned int optimize_intervals_double_1D_opt(double *oriData, size_t dataLength totalSampleSize++; pred_value = data_pos[-1]; pred_err = fabs(pred_value - *data_pos); - radiusIndex = (unsigned long)((pred_err/realPrecision+1)/2); - if(radiusIndex>=confparams_cpr->maxRangeRadius) - radiusIndex = confparams_cpr->maxRangeRadius - 1; + double dbri = (unsigned long)((pred_err/realPrecision+1)/2); + if(dbri >= (double)confparams_cpr->maxRangeRadius) + radiusIndex = confparams_cpr->maxRangeRadius - 1; + else + radiusIndex = dbri; intervals[radiusIndex]++; data_pos += confparams_cpr->sampleDistance;