diff --git a/include/common/ttime.h b/include/common/ttime.h index 2f4129f979..8f4f4f15d8 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -73,7 +73,6 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) { } int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); -int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 00a36391fa..ad89e51a24 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -33,16 +33,16 @@ extern "C" { #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", DEBUG_TRACE, wDebugFlag, __VA_ARGS__); }} // clang-format on -#define WAL_PROTO_VER 0 -#define WAL_NOSUFFIX_LEN 20 -#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) -#define WAL_LOG_SUFFIX "log" -#define WAL_INDEX_SUFFIX "idx" -#define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalCkHead)) -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_MAGIC 0xFAFBFCFDULL +#define WAL_PROTO_VER 0 +#define WAL_NOSUFFIX_LEN 20 +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) +#define WAL_LOG_SUFFIX "log" +#define WAL_INDEX_SUFFIX "idx" +#define WAL_REFRESH_MS 1000 +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_MAGIC 0xFAFBFCFDULL +#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) typedef enum { TAOS_WAL_WRITE = 1, diff --git a/include/util/tdef.h b/include/util/tdef.h index 3b31398063..688fe5fe85 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -421,7 +421,7 @@ typedef enum ELogicConditionType { #define TSDB_DEFAULT_STABLES_HASH_SIZE 100 #define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 -#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3) +#define TSDB_MAX_MSG_SIZE (1024 * 1024 * 10) #define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P diff --git a/include/util/tutil.h b/include/util/tutil.h index 2e96c5b88e..6a1a40f14c 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -45,7 +45,6 @@ void taosIp2String(uint32_t ip, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); -char *strDupUnquo(const char *src); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { T_MD5_CTX context; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index fcc27e440c..db8afba409 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -40,11 +40,11 @@ bool tsPrintAuth = false; // multi process int32_t tsMultiProcess = 0; -int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024; -int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024; -int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024; -int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024; -int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024; +int32_t tsMnodeShmSize = TSDB_MAX_MSG_SIZE * 2 + 1024; +int32_t tsVnodeShmSize = TSDB_MAX_MSG_SIZE * 10 + 1024; +int32_t tsQnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024; +int32_t tsSnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024; +int32_t tsBnodeShmSize = TSDB_MAX_MSG_SIZE * 4 + 1024; int32_t tsNumOfShmThreads = 1; // queue & threads @@ -387,11 +387,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1; tsNumOfRpcThreads = tsNumOfCores / 2; @@ -447,8 +447,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1; tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; - tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, TSDB_MAX_WAL_SIZE * 10000L); - if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_WAL_SIZE * 10L, INT64_MAX, 0) != 0) + tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L); + if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 43003e7fab..8611278550 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5349,6 +5349,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1; if (tEncodeI8(pEncoder, pReq->action) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->colId) < 0) return -1; switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; @@ -5399,6 +5400,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1; switch (pReq->action) { case TSDB_ALTER_TABLE_ADD_COLUMN: if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 944ee6a731..b1e4321053 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -700,6 +700,8 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { numOfMonth *= 12; } + int64_t fraction = t % TSDB_TICK_PER_SECOND(precision); + struct tm tm; time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); taosLocalTime(&tt, &tm); @@ -707,35 +709,9 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) { tm.tm_year = mon / 12; tm.tm_mon = mon % 12; - return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); + return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision) + fraction); } -int64_t taosTimeSub(int64_t t, int64_t duration, char unit, int32_t precision) { - if (duration == 0) { - return t; - } - - if (unit != 'n' && unit != 'y') { - return t - duration; - } - - // The following code handles the y/n time duration - int64_t numOfMonth = duration; - if (unit == 'y') { - numOfMonth *= 12; - } - - struct tm tm; - time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision)); - taosLocalTime(&tt, &tm); - int32_t mon = tm.tm_year * 12 + tm.tm_mon - (int32_t)numOfMonth; - tm.tm_year = mon / 12; - tm.tm_mon = mon % 12; - - return (int64_t)(taosMktime(&tm) * TSDB_TICK_PER_SECOND(precision)); -} - - int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision) { if (ekey < skey) { int64_t tmp = ekey; @@ -844,11 +820,14 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio } else { // try to move current window to the left-hande-side, due to the offset effect. int64_t end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1; - ASSERT(end >= t); - end = taosTimeAdd(end, -pInterval->sliding, pInterval->slidingUnit, precision); - if (end >= t) { - start = taosTimeAdd(start, -pInterval->sliding, pInterval->slidingUnit, precision); + + int64_t newEnd = end; + while(newEnd >= t) { + end = newEnd; + newEnd = taosTimeAdd(newEnd, -pInterval->sliding, pInterval->slidingUnit, precision); } + + start = taosTimeAdd(end, -pInterval->interval, pInterval->intervalUnit, precision) + 1; } } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 302f0c5fbb..00659939e9 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -231,7 +231,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); mDebug("start to read sdb file:%s", file); - SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); + SSdbRaw *pRaw = taosMemoryMalloc(TSDB_MAX_MSG_SIZE + 100); if (pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed read sdb file since %s", terrstr()); @@ -556,8 +556,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter if (term != NULL) *term = commitTerm; if (config != NULL) *config = commitConfig; - mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", - pIter, commitIndex, commitTerm, commitConfig, pIter->name); + mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 + " file:%s", + pIter, commitIndex, commitTerm, commitConfig, pIter->name); return 0; } @@ -669,4 +670,4 @@ int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) { pIter->total += writelen; mDebug("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f6862621f9..208b5d3fa0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .initTqReader = true, .version = ver, }; - pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); + pHandle->execHandle.execCol.task[i] = + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols); ASSERT(pHandle->execHandle.execCol.task[i]); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner); @@ -679,9 +680,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessRunReq(pTask); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessRunReq(*ppTask); return 0; } else { return -1; @@ -696,14 +697,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { + int32_t taskId = req.taskId; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &req, &rsp); + streamProcessDispatchReq(*ppTask, &req, &rsp); return 0; } else { return -1; @@ -713,9 +714,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessRecoverReq(pTask, pReq, pMsg); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessRecoverReq(*ppTask, pReq, pMsg); return 0; } else { return -1; @@ -725,9 +726,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessDispatchRsp(pTask, pRsp); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessDispatchRsp(*ppTask, pRsp); return 0; } else { return -1; @@ -737,9 +738,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (pTask) { - streamProcessRecoverRsp(pTask, pRsp); + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + streamProcessRecoverRsp(*ppTask, pRsp); return 0; } else { return -1; @@ -749,10 +750,10 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - if (pTask) { + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + if (ppTask) { taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING); } // todo // clear queue @@ -780,16 +781,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); - int32_t taskId = req.dstTaskId; - SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { - return 0; + int32_t taskId = req.dstTaskId; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessRetrieveReq(*ppTask, &req, &rsp); + } else { + return -1; } - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessRetrieveReq(pTask, &req, &rsp); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 96839d1767..9afd132b8e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2572,14 +2572,41 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } +static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb* pTsdb) { + tb_uid_t suid = 0; + + SMetaReader mr = {0}; + metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, uid) < 0) { + metaReaderClear(&mr); // table not esist + return 0; + } + + if (mr.me.type == TSDB_CHILD_TABLE) { + suid = mr.me.ctbEntry.suid; + } else if (mr.me.type == TSDB_NORMAL_TABLE) { + suid = 0; + } else { + suid = 0; + } + + metaReaderClear(&mr); + + return suid; +} + void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { int32_t sversion = TSDBROW_SVERSION(pRow); + tb_uid_t suid = getTableSuidByUid(uid, pReader->pTsdb); + if (pReader->pSchema == NULL) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion); + // pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion); + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, suid, uid, sversion, &pReader->pSchema); } else if (pReader->pSchema->version != sversion) { taosMemoryFreeClear(pReader->pSchema); - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion); + // pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion); + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, suid, uid, sversion, &pReader->pSchema); } } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d7c283c70d..2cc4058b3b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -744,8 +744,8 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - - SNode* pCondition; + SLimitInfo limitInfo; + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -785,7 +785,7 @@ int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); -void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); +void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); @@ -797,7 +797,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); -void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); +STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index c46485a332..56a5e253d8 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -50,7 +50,7 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead STableListInfo* pTableList = &pTaskInfo->tableqinfoList; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 978bef1607..60799e0528 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -824,10 +824,10 @@ int32_t convertFillType(int32_t mode) { static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) { if (ascQuery) { - getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w); + *w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts); } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp - getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w); + *w = getAlignQueryTimeWindow(pInterval, pInterval->precision, ts); int64_t key = w->skey; while (key < ts) { // moving towards end diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7ff9e90c75..1bbf05294d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -834,18 +834,20 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) { void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } ///////////////////////////////////////////////////////////////////////////////////////////// -// todo refactor : return window -void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) { - win->skey = taosTimeTruncate(key, pInterval, precision); +STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { + STimeWindow win = {0}; + win.skey = taosTimeTruncate(key, pInterval, precision); /* * if the realSkey > INT64_MAX - pInterval->interval, the query duration between * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ - win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - if (win->ekey < win->skey) { - win->ekey = INT64_MAX; + win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + if (win.ekey < win.skey) { + win.ekey = INT64_MAX; } + + return win; } #if 0 @@ -3603,13 +3605,13 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf return TSDB_CODE_SUCCESS; } -void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { +void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) { ASSERT(numOfRows != 0); - pOperator->resultInfo.capacity = numOfRows; - pOperator->resultInfo.threshold = numOfRows * 0.75; + pResultInfo->capacity = numOfRows; + pResultInfo->threshold = numOfRows * 0.75; - if (pOperator->resultInfo.threshold == 0) { - pOperator->resultInfo.threshold = numOfRows; + if (pResultInfo->threshold == 0) { + pResultInfo->threshold = numOfRows; } } @@ -3672,7 +3674,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfRows = 1024; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3827,7 +3829,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys if (numOfRows * pResBlock->info.rowSize > TWOMB) { numOfRows = TWOMB / pResBlock->info.rowSize; } - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -4005,7 +4007,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy numOfRows = TWOMB / pResBlock->info.rowSize; } - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -4044,8 +4046,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); - STimeWindow w = TSWINDOW_INITIALIZER; - getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w); + STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); int32_t order = TSDB_ORDER_ASC; @@ -4082,7 +4083,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; int32_t numOfOutputCols = 0; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2964948e70..20630fd6ff 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -406,7 +406,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx goto _error; } - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index b864fae47f..497de8347c 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -41,7 +41,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = pResBlock; pOperator->name = "MergeJoinOperator"; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2e78b61b8c..849e92785c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -125,7 +125,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } if (order == TSDB_ORDER_ASC) { - getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w); + w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey); assert(w.ekey >= pBlockInfo->window.skey); if (w.ekey < pBlockInfo->window.ekey) { @@ -144,7 +144,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } } } else { - getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey, &w); + w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey); assert(w.skey <= pBlockInfo->window.ekey); if (w.skey > pBlockInfo->window.skey) { @@ -2324,7 +2324,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->pCondition = pScanNode->node.pConditions; pInfo->scanCols = colList; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); tNameAssign(&pInfo->name, &pScanNode->tableName); const char* name = tNameGetTableName(&pInfo->name); @@ -2554,7 +2554,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = @@ -3099,7 +3099,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->info = pInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL, diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a3b79d9597..f019ee94b8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -26,7 +26,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -41,13 +41,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + + initResultSizeInfo(&pOperator->resultInfo, 1024); + pInfo->binfo.pRes = pResBlock; - - initResultSizeInfo(pOperator, 1024); - - pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pCondition = pSortNode->node.pConditions; pInfo->pColMatchInfo = pColMatchColInfo; + initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blocking = true; @@ -208,26 +210,44 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); - if (pBlock != NULL) { - doFilter(pInfo->pCondition, pBlock); - } - + pInfo->pColMatchInfo, pInfo); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); - break; + return NULL; } - if (blockDataGetNumOfRows(pBlock) > 0) { + doFilter(pInfo->pCondition, pBlock); + if (blockDataGetNumOfRows(pBlock) == 0) { + continue; + } + + // todo add the limit/offset info + if (pInfo->limitInfo.remainOffset > 0) { + if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { + pInfo->limitInfo.remainOffset -= pBlock->info.rows; + continue; + } + + blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset); + pInfo->limitInfo.remainOffset = 0; + } + + if (pInfo->limitInfo.limit.limit > 0 && + pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) { + int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows; + blockDataKeepFirstNRows(pBlock, remain); + } + + size_t numOfRows = blockDataGetNumOfRows(pBlock); + pInfo->limitInfo.numOfOutputRows += numOfRows; + pOperator->resultInfo.totalRows += numOfRows; + + if (numOfRows > 0) { break; } } - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; - } - - return pBlock; + return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL; } void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { @@ -479,7 +499,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); ; @@ -711,7 +731,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); - initResultSizeInfo(pOperator, 1024); + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; pInfo->binfo.pRes = pResBlock; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d206f56ec3..b5966fc463 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -652,7 +652,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num void printDataBlock(SSDataBlock* pBlock, const char* flag) { if (!pBlock || pBlock->info.rows == 0) { - qDebug("======printDataBlock: Block is Null or Empty"); + qDebug("===stream===printDataBlock: Block is Null or Empty"); return; } char* pBuf = NULL; @@ -772,12 +772,6 @@ int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_f return midPos; } -int64_t getReskey(void* data, int32_t index) { - SArray* res = (SArray*)data; - SResKeyPos* pos = taosArrayGetP(res, index); - return *(int64_t*)pos->key; -} - int32_t compareResKey(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SResKeyPos* pos = taosArrayGetP(res, index); @@ -1537,8 +1531,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; + qDebug("===stream===single interval is done"); freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); } + printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } @@ -1772,7 +1768,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExprSupp* pSup = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1853,7 +1849,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, numOfRows); + initResultSizeInfo(&pOperator->resultInfo, numOfRows); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); @@ -2313,7 +2309,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -2361,7 +2357,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); @@ -2409,7 +2405,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo } size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -2985,7 +2981,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); @@ -3161,7 +3157,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); @@ -4425,7 +4421,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); pInfo->stateCol = extractColumnFromColumnNode(pColNode); - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pStateNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); @@ -4675,7 +4671,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->primaryTsIndex = primaryTsSlotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -4981,7 +4977,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI SExprSupp* pExprSupp = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(pOperator, 4096); + initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&iaInfo->binfo, pResBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 9c004bf1c4..ec8e6b038e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -958,6 +958,7 @@ static bool validateHistogramBinDesc(char* binDescStr, int8_t binType, char* err return false; } + cJSON_Delete(binDesc); taosMemoryFree(intervals); return true; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index b754c52bbd..050f77bb21 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2750,6 +2750,7 @@ static bool getHistogramBinDesc(SHistoFuncBin** bins, int32_t* binNum, char* bin (*bins)[i].count = 0; } + cJSON_Delete(binDesc); taosMemoryFree(intervals); return true; } diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 76de4da4fd..254c99f05d 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1195,7 +1195,7 @@ static void vectorMathTsSubHelper(SColumnInfoData* pLeftCol, SColumnInfoData* pR colDataAppendNULL(pOutputCol, i); continue; // TODO set null or ignore } - *output = taosTimeSub(getVectorBigintValueFnLeft(pLeftCol->pData, i), getVectorBigintValueFnRight(pRightCol->pData, 0), + *output = taosTimeAdd(getVectorBigintValueFnLeft(pLeftCol->pData, i), -getVectorBigintValueFnRight(pRightCol->pData, 0), pRightCol->info.scale, pRightCol->info.precision); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index edc811fe82..a5fd3fca35 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -40,7 +40,6 @@ static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { } static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { - ASSERT(pWal->fileInfoSet != NULL); int32_t sz = taosArrayGetSize(pWal->fileInfoSet); ASSERT(sz > 0); #if 0 @@ -55,7 +54,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); - int readSize = TMIN(WAL_MAX_SIZE + 2, fileSize); + int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize); pLastFileInfo->fileSize = fileSize; TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); @@ -73,7 +72,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { return -1; } - taosLSeekFile(pFile, -readSize, SEEK_END); + int64_t offset; + offset = taosLSeekFile(pFile, -readSize, SEEK_END); if (readSize != taosReadFile(pFile, buf, readSize)) { taosMemoryFree(buf); taosCloseFile(&pFile); @@ -81,31 +81,56 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { return -1; } - char* haystack = buf; char* found = NULL; - char* candidate; - while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { - // read and validate - SWalCkHead* logContent = (SWalCkHead*)candidate; - if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { - found = candidate; + while (1) { + char* haystack = buf; + char* candidate; + while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { + // read and validate + SWalCkHead* logContent = (SWalCkHead*)candidate; + if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { + found = candidate; + } + haystack = candidate + 1; } - haystack = candidate + 1; - } - if (found == buf) { - SWalCkHead* logContent = (SWalCkHead*)found; - if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { - // file has to be deleted + if (found || offset == 0) break; + offset = TMIN(0, offset - readSize + sizeof(uint64_t)); + int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET); + ASSERT(offset == offset2); + if (readSize != taosReadFile(pFile, buf, readSize)) { taosMemoryFree(buf); taosCloseFile(&pFile); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } +#if 0 + if (found == buf) { + SWalCkHead* logContent = (SWalCkHead*)found; + if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { + // file has to be deleted + taosMemoryFree(buf); + taosCloseFile(&pFile); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + } +#endif } - taosCloseFile(&pFile); - SWalCkHead* lastEntry = (SWalCkHead*)found; + // TODO truncate file - return lastEntry->head.version; + if (found == NULL) { + // file corrupted, no complete log + // TODO delete and search in previous files + ASSERT(0); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + SWalCkHead* lastEntry = (SWalCkHead*)found; + int64_t retVer = lastEntry->head.version; + taosCloseFile(&pFile); + taosMemoryFree(buf); + + return retVer; } int walCheckAndRepairMeta(SWal* pWal) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 4fc135a1cf..d6348cc5dd 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -436,11 +436,6 @@ END: } int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) { - if (bodyLen > TSDB_MAX_WAL_SIZE) { - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - return -1; - } - taosThreadMutexLock(&pWal->mutex); int64_t index = pWal->vers.lastVer + 1; @@ -472,10 +467,6 @@ int32_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SWalSync int32_t bodyLen) { int32_t code = 0; - if (bodyLen > TSDB_MAX_WAL_SIZE) { - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - return -1; - } taosThreadMutexLock(&pWal->mutex); // concurrency control: diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 7f3728e2ad..addb9f55ba 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -64,20 +64,6 @@ int32_t strdequote(char *z) { return j + 1; // only one quote, do nothing } -char *strDupUnquo(const char *src) { - if (src == NULL) return NULL; - if (src[0] != '`') return strdup(src); - int32_t len = (int32_t)strlen(src); - if (src[len - 1] != '`') return NULL; - char *ret = taosMemoryMalloc(len); - if (ret == NULL) return NULL; - for (int32_t i = 0; i < len - 1; i++) { - ret[i] = src[i + 1]; - } - ret[len - 1] = 0; - return ret; -} - size_t strtrim(char *z) { int32_t i = 0; int32_t j = 0; diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index b6dffb5fe3..1399be7b53 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -185,6 +185,7 @@ print ===> rows1: $data10 $data11 $data12 $data13 $data14 print ===> rows2: $data20 $data21 $data22 $data23 $data24 print ===> rows3: $data30 $data31 $data32 $data33 $data34 if $rows != 4 then + print expect 4, actual: $rows return -1 endi if $data00 != @21-12-08 00:00:00.000@ then diff --git a/tests/script/tsim/scalar/scalar.sim b/tests/script/tsim/scalar/scalar.sim index 32224e33ba..29cc67ec24 100644 --- a/tests/script/tsim/scalar/scalar.sim +++ b/tests/script/tsim/scalar/scalar.sim @@ -43,7 +43,8 @@ sql select cast(1 as timestamp)+1n; if $rows != 1 then return -1 endi -if $data00 != @70-02-01 08:00:00.000@ then +if $data00 != @70-02-01 08:00:00.001@ then + print expect 70-02-01 08:00:00.001, actual: $data00 return -1 endi @@ -52,11 +53,13 @@ if $rows != 1 then return -1 endi -sql select cast(1 as timestamp)-1y; +# there is an *bug* in print timestamp that smaller than 0, so let's try value that is greater than 0. +sql select cast(1 as timestamp)+1y; if $rows != 1 then return -1 endi -if $data00 != @69-01-01 08:00:00.000@ then +if $data00 != @71-01-01 08:00:00.001@ then + print expect 71-01-01 08:00:00.001 , actual: $data00 return -1 endi diff --git a/tests/system-test/2-query/tsbsQuery.py b/tests/system-test/2-query/tsbsQuery.py index 1863f67c8e..a82c7bfe1a 100644 --- a/tests/system-test/2-query/tsbsQuery.py +++ b/tests/system-test/2-query/tsbsQuery.py @@ -14,6 +14,7 @@ class TDTestCase: clientCfgDict["debugFlag"] = 131 updatecfgDict = {'clientCfg': {}} updatecfgDict = {'debugFlag': 131} + updatecfgDict = {'keepColumnName': 1} updatecfgDict["clientCfg"] = clientCfgDict def init(self, conn, logSql): @@ -42,7 +43,7 @@ class TDTestCase: tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}',NULL ,'v2.3')") else: tdSql.execute(f"create table dct{i} using diagnostics (name,fleet,driver,model,device_version) tags ('truck_{i}','South{i}','Trish{i}','H-{i}','v2.3')") - for j in range(10): + for j in range(10): for i in range(100): tdSql.execute( f"insert into rct{j} values ( {ts+i*60000}, {80+i}, {90+i}, {85+i}, {30+i*10}, {1.2*i}, {221+i*2}, {20+i*0.2}, {1500+i*20}, {150+i*2},{5+i} )" @@ -92,23 +93,23 @@ class TDTestCase: # test partition interval limit (PRcore-TD-17410) - # tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);") - # tdSql.checkRows(10) + tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings partition BY name,driver,fleet interval (10m) limit 1);") + tdSql.checkRows(10) # test partition interval Pseudo time-column tdSql.query("SELECT count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") # 1 high-load: - # tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;") + tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name desc, ts DESC;") - # tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;") + tdSql.query("SELECT ts,name,driver,current_load,load_capacity FROM (SELECT last(ts) as ts,name,driver, current_load,load_capacity FROM diagnostics WHERE fleet = 'South' partition by name,driver) WHERE current_load>= (0.9 * load_capacity) partition by name ORDER BY name ;") # 2 stationary-trucks tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1)") tdSql.query("select name,driver from (SELECT name,driver,fleet ,avg(velocity) as mean_velocity FROM readings WHERE ts > '2016-01-01T15:07:21Z' AND ts <= '2016-01-01T16:17:21Z' partition BY name,driver,fleet interval(10m) LIMIT 1) WHERE fleet = 'West' AND mean_velocity < 1000 partition BY name") # 3 long-driving-sessions - # tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;") + tdSql.query("SELECT name,driver FROM(SELECT name,driver,count(*) AS ten_min FROM(SELECT _wstart as ts,name,driver,avg(velocity) as mean_velocity FROM readings where ts > '2016-01-01T00:00:34Z' AND ts <= '2016-01-01T04:00:34Z' partition BY name,driver interval(10m)) WHERE mean_velocity > 1 GROUP BY name,driver) WHERE ten_min > 22 ;") #4 long-daily-sessions @@ -130,15 +131,18 @@ class TDTestCase: # 8. daily-activity tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + + tdSql.query(" SELECT model,ms1 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) ) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1;") + tdSql.query("SELECT _wstart,model,fleet,count(ms1)/144 FROM (SELECT _wstart as ts1,model, fleet,avg(status) AS ms1 FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition by model, fleet interval(10m) fill(value,0)) WHERE ts1 >= '2016-01-01T00:00:00Z' AND ts1 < '2016-01-05T00:00:01Z' AND ms1<1 partition by model, fleet interval(1d) ;") # 9. breakdown-frequency # NULL ---count(NULL)=0 expect count(NULL)= 100 - tdSql.query("select tbname,count(model),model from readings partition by tbname,model;") - # model=NULL count(other) is 0 - tdSql.query("select tbname,count(name),model from readings where model=NULL partition by tbname,model;") + tdSql.query("SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where model is null partition BY model,state_changed ") + tdSql.query(" SELECT model,state_changed,count(state_changed) FROM (SELECT model,diff(broken_down) AS state_changed FROM (SELECT _wstart,model,cast(cast(floor(2*(sum(nzs)/count(nzs))) as bool) as int) AS broken_down FROM (SELECT ts,model, cast(cast(status as bool) as int) AS nzs FROM diagnostics WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' ) WHERE ts >= '2016-01-01T00:00:00Z' AND ts < '2016-01-05T00:00:01Z' partition BY model interval(10m)) partition BY model) where state_changed =1 partition BY model,state_changed ;") + #it's already supported: # last-loc tdSql.query("SELECT last_row(ts),latitude,longitude,name,driver FROM readings WHERE fleet='South' and name IS NOT NULL partition BY name,driver order by name ;") diff --git a/tools/taos-tools b/tools/taos-tools index 2b75339b8b..f84cb6e515 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 2b75339b8b5c239619d1f09970d03075c58140dd +Subproject commit f84cb6e51556d8030585128c2b252aa2a6453328