diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2de4ffdc17..66bae5ad3b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -120,6 +120,7 @@ extern SDiskCfg tsDiskCfg[]; // udf extern bool tsStartUdfd; +extern char tsUdfdResFuncs[]; // schemaless extern char tsSmlChildTableName[]; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 01d42d6950..840e7309fe 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -619,6 +619,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157) #define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158) +#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159) +#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ddda8f8c9a..a6e30ae0b5 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -163,6 +163,7 @@ int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds +char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -385,9 +386,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; -// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; -// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); -// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; + // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; + // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); @@ -421,6 +422,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; + if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; GRANT_CFG_ADD; return 0; } @@ -527,15 +529,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } -/* - pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); - if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfQnodeFetchThreads = numOfCores / 2; - tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); - pItem->i32 = tsNumOfQnodeFetchThreads; - pItem->stype = stype; - } -*/ + /* + pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfQnodeFetchThreads = numOfCores / 2; + tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + pItem->i32 = tsNumOfQnodeFetchThreads; + pItem->stype = stype; + } + */ pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { @@ -693,7 +695,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; @@ -717,6 +719,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; + tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; @@ -941,10 +944,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -/* - } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { - tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; -*/ + /* + } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { + tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + */ } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 092c861766..d304f553ef 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -51,7 +51,6 @@ target_sources( "src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbRetention2.c" "src/tsdb/tsdbDiskData.c" - "src/tsdb/tsdbCompress.c" "src/tsdb/tsdbCompact.c" "src/tsdb/tsdbMergeTree.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 9931462e5f..dade85b12d 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -146,6 +146,7 @@ struct SRSmaInfoItem { uint16_t nScanned; int32_t maxDelay; // ms tmr_h tmrId; + void *pStreamState; }; struct SRSmaInfo { @@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); -void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName); +void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int32_t ref = T_REF_INC(pRSmaInfo); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ec8fcb2932..412def646f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); } +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { + tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); +} + +void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { + tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level); +} + static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; } @@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { taosTmrStopA(&pItem->tmrId); } + if (isDeepFree && pItem->pStreamState) { + streamStateClose(pItem->pStreamState); + } + if (isDeepFree && pInfo->taskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } else { @@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SRetention *pRetention = SMA_RETENTION(pSma); STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); SVnode *pVnode = pSma->pVnode; + char taskInfDir[TSDB_FILENAME_LEN] = {0}; + void *pStreamState = NULL; + + // set the backend of stream state + tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir); + if (!taosCheckExistFile(taskInfDir)) { + char *s = strdup(taskInfDir); + if (taosMulMkDir(taosDirName(s)) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(s); + return TSDB_CODE_FAILED; + } + taosMemoryFree(s); + } + pStreamState = streamStateOpen(taskInfDir, NULL, true); + if (!pStreamState) { + terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; + return TSDB_CODE_FAILED; + } + + SReadHandle handle = { .meta = pVnode->pMeta, .vnode = pVnode, .initTqReader = 1, + .pStateBackend = pStreamState, }; - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; @@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat } SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot + pItem->pStreamState = pStreamState; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->fetchLevel = pItem->level; taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, @@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { goto _err; } - if (nTables <= 0) { smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type); return TSDB_CODE_SUCCESS; } +#if 0 // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) { goto _err; } +#endif // step 3: reload ts data from checkpoint if (tdRSmaRestoreTSDataReload(pSma) < 0) { @@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte return TSDB_CODE_SUCCESS; } +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + int32_t vid = SMA_VID(pSma); + + if (taosHashGetSize(pInfoHash) <= 0) { + return TSDB_CODE_SUCCESS; + } + + int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); + if (pRSmaStat->commitAppliedVer <= fsMaxVer) { + smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, + pRSmaStat->commitAppliedVer, fsMaxVer); + return TSDB_CODE_SUCCESS; + } + + void *infoHash = NULL; + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); + if (pItem && pItem->pStreamState) { + if (streamStateCommit(pItem->pStreamState) < 0) { + terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; + goto _err; + } + smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid, + i + 1); + } + } + } + + return TSDB_CODE_SUCCESS; +_err: + smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); + return TSDB_CODE_FAILED; +} + +#if 0 int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; @@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); if (pRSmaStat->commitAppliedVer <= fsMaxVer) { smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, - pRSmaStat->commitAppliedVer, fsMaxVer); + pRSmaStat->commitAppliedVer, fsMaxVer); return TSDB_CODE_SUCCESS; } @@ -1579,6 +1661,8 @@ _err: return TSDB_CODE_FAILED; } +#endif + /** * @brief trigger to get rsma result in async mode * @@ -1926,7 +2010,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, - atomic_load_64(&pRSmaStat->nBufItems)); + atomic_load_64(&pRSmaStat->nBufItems)); break; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index db3076e1bd..cbe916d0f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - if (pRsp->withSchema) { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); - } else { - ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); - } + ASSERT(!pRsp->withSchema); + ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { if (pRsp->blockNum > 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCompress.c b/source/dnode/vnode/src/tsdb/tsdbCompress.c deleted file mode 100644 index 76be7c1070..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbCompress.c +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -// Integer ===================================================== -typedef struct { - int8_t rawCopy; - int64_t prevVal; - int32_t nVal; - int32_t nBuf; - uint8_t *pBuf; -} SIntCompressor; - -#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) -#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) - -static int32_t tsdbCmprI64(SIntCompressor *pCompressor, int64_t val) { - int32_t code = 0; - - // raw copy - if (pCompressor->rawCopy) { - memcpy(pCompressor->pBuf + pCompressor->nBuf, &val, sizeof(val)); - pCompressor->nBuf += sizeof(val); - pCompressor->nVal++; - goto _exit; - } - - if (!I64_SAFE_ADD(val, pCompressor->prevVal)) { - pCompressor->rawCopy = 1; - // TODO: decompress and copy - pCompressor->nVal++; - goto _exit; - } - - int64_t diff = val - pCompressor->prevVal; - uint8_t zigzag = ZIGZAGE(int64_t, diff); - - if (zigzag >= SIMPLE8B_MAX) { - pCompressor->rawCopy = 1; - // TODO: decompress and copy - pCompressor->nVal++; - goto _exit; - } - -_exit: - return code; -} - -// Timestamp ===================================================== - -// Float ===================================================== \ No newline at end of file diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 18f6d3ad2c..a516a9d11f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -462,6 +462,7 @@ typedef struct SPartitionDataInfo { typedef struct STimeWindowAggSupp { int8_t calTrigger; int64_t waterMark; + int64_t deleteMark; TSKEY maxTs; TSKEY minTs; SColumnInfoData timeWindowData; // query time window info for scalar function execution. @@ -1090,7 +1091,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo); int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult); -int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize); +int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e0d417093c..f6bb1ffe42 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4166,9 +4166,8 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI }; char* value = NULL; int32_t size = pAggSup->resultRowSize; - /*if (streamStateGet(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {*/ - /*value = taosMemoryCalloc(1, size);*/ - /*}*/ + + tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -4186,7 +4185,7 @@ int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pR return TSDB_CODE_SUCCESS; } -int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { +int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize); return TSDB_CODE_SUCCESS; } @@ -4259,8 +4258,9 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock } } } - releaseOutputBuf(pTaskInfo, &key, pRow); + pBlock->info.rows += pRow->numOfRows; + releaseOutputBuf(pTaskInfo, &key, pRow); } blockDataUpdateTsWindow(pBlock, 0); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c221aaf4fd..81c0f9cd0c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -867,6 +867,10 @@ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); } +static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) { + return saveWinResult(ts, -1, -1, groupId, pUpdatedMap); +} + static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated); } @@ -918,12 +922,16 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { } } -bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { - ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); - return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark; +bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) { + ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0); + return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark; } -bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); } +bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); } + +bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { + return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark; +} static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, int32_t scanFlag) { @@ -1374,6 +1382,41 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, return true; } +static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SWinKey key = {.ts = ts, .groupId = groupId}; + tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey)); + streamStateDel(pOperator->pTaskInfo->streamInfo.pState, &key); + return true; +} + +static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock, + SArray* pUpWins, SHashObj* pUpdatedMap) { + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); + while (win.ekey <= endTsCols[i]) { + uint64_t winGpId = pGpDatas[i]; + bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput); + SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; + if (pUpWins && res) { + taosArrayPush(pUpWins, &winRes); + } + if (pUpdatedMap) { + taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); + } + getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); + } + } +} + bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) { size_t bytes = sizeof(TSKEY); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); @@ -1383,8 +1426,6 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) // window has been closed return false; } - // SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId); - // dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage); tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); return true; } @@ -1512,6 +1553,49 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup return TSDB_CODE_SUCCESS; } +static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval, + SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) { + qDebug("===stream===close interval window"); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pIte, &keyLen); + SWinKey* pWinKey = (SWinKey*)key; + void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey)); + STimeWindow win = { + .skey = pWinKey->ts, + .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1, + }; + if (isCloseWindow(&win, pTwSup)) { + if (chIds && pPullDataMap) { + SArray* chAy = *(SArray**)chIds; + int32_t size = taosArrayGetSize(chAy); + qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size); + for (int32_t i = 0; i < size; i++) { + qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i)); + } + continue; + } else if (pPullDataMap) { + qDebug("===stream===close window %" PRId64, pWinKey->ts); + } + + if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); + + if (needDeleteWindowBuf(&win, pTwSup)) { + streamStateDel(pOperator->pTaskInfo->streamInfo.pState, pWinKey); + } + } + } + return TSDB_CODE_SUCCESS; +} + static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) { int32_t size = taosArrayGetSize(pChildren); for (int32_t i = 0; i < size; i++) { @@ -4918,8 +5002,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR SExprSupp* pSup = &pOperatorInfo->exprSupp; SInterval* pInterval = &iaInfo->interval; - int32_t startPos = 0; - int64_t* tsCols = extractTsCol(pBlock, iaInfo); + int32_t startPos = 0; + int64_t* tsCols = extractTsCol(pBlock, iaInfo); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); @@ -4938,7 +5022,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR win.skey = miaInfo->curTs; win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); + int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { T_LONG_JMP(pTaskInfo->env, ret); } @@ -4963,7 +5047,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; startPos = currPos; - ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); + ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { T_LONG_JMP(pTaskInfo->env, ret); } @@ -5032,7 +5116,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { pMiaInfo->prefetchedBlock = pBlock; cleanupAfterGroupResultGen(pMiaInfo, pRes); break; - } else { + } else { // continue } } @@ -5197,7 +5281,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); -// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); + // finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); return TSDB_CODE_SUCCESS; } @@ -5222,7 +5306,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t STimeWindow* prevWin = &prevGrpWin->window; if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { -// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); + // finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); tdListPopNode(miaInfo->groupIntervals, listNode); } } @@ -5382,7 +5466,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (listNode != NULL) { SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); -// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); + // finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); pRes->info.groupId = grpWin->groupId; } } @@ -5591,7 +5675,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { - saveWinResultRow(pResult, tableGroupId, pUpdatedMap); + saveWinResultInfo(pResult->win.skey, tableGroupId, pUpdatedMap); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, @@ -5600,7 +5684,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* .ts = nextWin.skey, .groupId = tableGroupId, }; - saveOutput(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize); + saveOutputBuf(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize); releaseOutputBuf(pTaskInfo, &key, pResult); int32_t prevEndPos = (forwardRows - 1) * step + startPos; ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); @@ -5645,7 +5729,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); + // doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; qDebug("===stream===single interval is done"); @@ -5671,13 +5756,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pBlock, "single interval recv"); if (pBlock->info.type == STREAM_CLEAR) { - doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, - NULL); + doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval, - pUpdatedMap); + // doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval, + // pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins, + pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); @@ -5704,9 +5790,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); - doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); + // doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); // new disc buf - /*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/ + doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); @@ -5741,8 +5827,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { #endif pOperator->status = OP_RES_TO_RETURN; - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, - pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); + closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, + pOperator); void* pIte = NULL; while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { @@ -5751,7 +5837,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { taosArraySort(pUpdated, resultrowComparAsc); // new disc buf - finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); + // finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, + // pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); removeDeleteResults(pUpdatedMap, pInfo->pDelWins); @@ -5762,9 +5849,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + // doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); // new disc buf - // doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); + doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo); printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } @@ -5809,6 +5896,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, + .deleteMark = INT64_MAX, }; ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index a45e4585e8..636f006d6e 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -41,6 +41,8 @@ typedef struct SUdfdContext { uv_mutex_t udfsMutex; SHashObj * udfsHash; + SArray* residentFuncs; + bool printVersion; } SUdfdContext; @@ -67,6 +69,7 @@ typedef struct SUdf { EUdfState state; uv_mutex_t lock; uv_cond_t condReady; + bool resident; char name[TSDB_FUNC_NAME_LEN]; int8_t funcType; @@ -200,6 +203,14 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (udf->initFunc) { udf->initFunc(); } + udf->resident = false; + for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { + char* funcName = taosArrayGet(global.residentFuncs, i); + if (strcmp(setup->udfName, funcName) == 0) { + udf->resident = true; + break; + } + } udf->state = UDF_STATE_READY; uv_cond_broadcast(&udf->condReady); uv_mutex_unlock(&udf->lock); @@ -345,7 +356,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_lock(&global.udfsMutex); udf->refCount--; - if (udf->refCount == 0) { + if (udf->refCount == 0 && !udf->resident) { unloadUdf = true; taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); } @@ -576,9 +587,9 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *mergeSuffix = "_merge"; - strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); - strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix)); - uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc)); + strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); + strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); + uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc)); } return 0; } @@ -919,8 +930,6 @@ static int32_t udfdRun() { uv_run(global.loop, UV_RUN_DEFAULT); uv_loop_close(global.loop); - uv_mutex_destroy(&global.udfsMutex); - taosHashCleanup(global.udfsHash); return 0; } @@ -941,6 +950,47 @@ void udfdConnectMnodeThreadFunc(void *args) { } } +int32_t udfdInitResidentFuncs() { + if (strlen(tsUdfdResFuncs) == 0) { + return TSDB_CODE_SUCCESS; + } + + global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN); + char* pSave = tsUdfdResFuncs; + char* token; + while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { + char func[TSDB_FUNC_NAME_LEN] = {0}; + strncpy(func, token, strlen(token)); + taosArrayPush(global.residentFuncs, func); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t udfdDeinitResidentFuncs() { + for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { + char* funcName = taosArrayGet(global.residentFuncs, i); + SUdf** udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); + if (udfInHash) { + taosHashRemove(global.udfsHash, funcName, strlen(funcName)); + SUdf* udf = *udfInHash; + if (udf->destroyFunc) { + (udf->destroyFunc)(); + } + uv_dlclose(&udf->lib); + taosMemoryFree(udf); + } + } + taosArrayDestroy(global.residentFuncs); + return TSDB_CODE_SUCCESS; +} + +int32_t udfdCleanup() { + uv_mutex_destroy(&global.udfsMutex); + taosHashCleanup(global.udfsHash); + return 0; +} + int main(int argc, char *argv[]) { if (!taosCheckSystemIsLittleEnd()) { printf("failed to start since on non-little-end machines\n"); @@ -978,6 +1028,8 @@ int main(int argc, char *argv[]) { return -5; } + udfdInitResidentFuncs(); + uv_thread_t mnodeConnectThread; uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); @@ -986,5 +1038,7 @@ int main(int argc, char *argv[]) { removeListeningPipe(); udfdCloseClientRpc(); + udfdDeinitResidentFuncs(); + udfdCleanup(); return 0; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index beb938b161..a3cb0c2654 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -915,20 +915,30 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) } static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); + SLogicNode* pSplitNode = pInfo->pSplitNode; + if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && + NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { + pSplitNode = pInfo->pSplitNode->pParent; + } + int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, - (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); } ++(pCxt->groupId); return code; } static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true); + SLogicNode* pSplitNode = pInfo->pSplitNode; + if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && + NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { + pSplitNode = pInfo->pSplitNode->pParent; + } + int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true); if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, - (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); } pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; ++(pCxt->groupId); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5c7c5e7a6d..fde8bca77b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -140,15 +140,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* if (streamStateGet(pState, key, pVal, pVLen) == 0) { return 0; } - void* tmp = taosMemoryCalloc(1, size); - if (streamStatePut(pState, key, &tmp, size) == 0) { - taosMemoryFree(tmp); - int32_t code = streamStateGet(pState, key, pVal, pVLen); - ASSERT(code == 0); - return code; - } - taosMemoryFree(tmp); - return -1; + *pVal = tdbRealloc(NULL, size); + memset(*pVal, 0, size); + return 0; } int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { @@ -196,9 +190,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key if (pCur == NULL) { return NULL; } + if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) { + taosMemoryFree(pCur); + return NULL; + } int32_t c; if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { + tdbTbcClose(pCur->pCur); taosMemoryFree(pCur); return NULL; } @@ -217,9 +216,14 @@ SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key if (pCur == NULL) { return NULL; } + if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) { + taosMemoryFree(pCur); + return NULL; + } int32_t c; if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { + tdbTbcClose(pCur->pCur); taosMemoryFree(pCur); return NULL; } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 543ffc55b6..04711eb6a0 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -248,6 +248,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { return 0; } + // loop to write the dirty pages to file SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); SRBTreeNode *pNode = NULL; while ((pNode = tRBTreeIterNext(&iter)) != NULL) { @@ -257,6 +258,15 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ASSERT(0); return -1; } + } + + tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); + pPager->dbOrigSize = pPager->dbFileSize; + + // release the page + iter = tRBTreeIterCreate(&pPager->rbt, 1); + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; pPage->isDirty = 0; @@ -265,29 +275,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { } tRBTreeCreate(&pPager->rbt, pageCmpFn); - /* - // loop to write the dirty pages to file - for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) { - // TODO: update the page footer - ret = tdbPagerWritePageToDB(pPager, pPage); - if (ret < 0) { - ASSERT(0); - return -1; - } - } - - // release the page - for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) { - pPager->pDirty = pPage->pDirtyNext; - pPage->pDirtyNext = NULL; - - pPage->isDirty = 0; - - tdbPCacheRelease(pPager->pCache, pPage, pTxn); - } - */ - tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize); - pPager->dbOrigSize = pPager->dbFileSize; // sync the db file tdbOsFSync(pPager->fd); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index b7783d0629..5284aeff77 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -424,6 +424,7 @@ int walLoadMeta(SWal* pWal) { // find existing meta file int metaVer = walFindCurMetaVer(pWal); if (metaVer == -1) { + wDebug("vgId:%d wal find meta ver %d", pWal->cfg.vgId, metaVer); return -1; } char fnameStr[WAL_FILE_LEN]; diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index ba877915b1..62b9d87628 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -50,6 +50,7 @@ #define _DEFAULT_SOURCE #include "tcompression.h" #include "lz4.h" +#include "tRealloc.h" #include "tlog.h" #ifdef TD_TSZ @@ -814,24 +815,24 @@ int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, cha uint32_t predicted = prev_value; uint32_t diff = curr.bits ^ predicted; - int32_t leading_zeros = FLOAT_BYTES * BITS_PER_BYTE; - int32_t trailing_zeros = leading_zeros; + int32_t clz = FLOAT_BYTES * BITS_PER_BYTE; + int32_t ctz = clz; if (diff) { - trailing_zeros = BUILDIN_CTZ(diff); - leading_zeros = BUILDIN_CLZ(diff); + ctz = BUILDIN_CTZ(diff); + clz = BUILDIN_CLZ(diff); } uint8_t nbytes = 0; uint8_t flag; - if (trailing_zeros > leading_zeros) { - nbytes = (uint8_t)(FLOAT_BYTES - trailing_zeros / BITS_PER_BYTE); + if (ctz > clz) { + nbytes = (uint8_t)(FLOAT_BYTES - ctz / BITS_PER_BYTE); if (nbytes > 0) nbytes--; flag = ((uint8_t)1 << 3) | nbytes; } else { - nbytes = (uint8_t)(FLOAT_BYTES - leading_zeros / BITS_PER_BYTE); + nbytes = (uint8_t)(FLOAT_BYTES - clz / BITS_PER_BYTE); if (nbytes > 0) nbytes--; flag = nbytes; } @@ -994,3 +995,621 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output); } #endif + +/************************************************************************* + * STREAM COMPRESSION + *************************************************************************/ +#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a))) +typedef struct SCompressor SCompressor; + +static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData); +static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData); +static struct { + int8_t type; + int32_t bytes; + int8_t isVarLen; + int32_t (*cmprFn)(SCompressor *, const void *, int32_t nData); +} DATA_TYPE_INFO[] = { + {TSDB_DATA_TYPE_NULL, 0, 0, NULL}, // TSDB_DATA_TYPE_NULL + {TSDB_DATA_TYPE_BOOL, 1, 0, tCompBool}, // TSDB_DATA_TYPE_BOOL + {TSDB_DATA_TYPE_TINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_TINYINT + {TSDB_DATA_TYPE_SMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_SMALLINT + {TSDB_DATA_TYPE_INT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_INT + {TSDB_DATA_TYPE_BIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_BIGINT + {TSDB_DATA_TYPE_FLOAT, 4, 0, tCompFloat}, // TSDB_DATA_TYPE_FLOAT + {TSDB_DATA_TYPE_DOUBLE, 8, 0, tCompDouble}, // TSDB_DATA_TYPE_DOUBLE + {TSDB_DATA_TYPE_VARCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARCHAR + {TSDB_DATA_TYPE_TIMESTAMP, 8, 0, tCompTimestamp}, // pTSDB_DATA_TYPE_TIMESTAMP + {TSDB_DATA_TYPE_NCHAR, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_NCHAR + {TSDB_DATA_TYPE_UTINYINT, 1, 0, tCompInt}, // TSDB_DATA_TYPE_UTINYINT + {TSDB_DATA_TYPE_USMALLINT, 2, 0, tCompInt}, // TSDB_DATA_TYPE_USMALLINT + {TSDB_DATA_TYPE_UINT, 4, 0, tCompInt}, // TSDB_DATA_TYPE_UINT + {TSDB_DATA_TYPE_UBIGINT, 8, 0, tCompInt}, // TSDB_DATA_TYPE_UBIGINT + {TSDB_DATA_TYPE_JSON, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_JSON + {TSDB_DATA_TYPE_VARBINARY, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_VARBINARY + {TSDB_DATA_TYPE_DECIMAL, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_DECIMAL + {TSDB_DATA_TYPE_BLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_BLOB + {TSDB_DATA_TYPE_MEDIUMBLOB, 1, 1, tCompBinary}, // TSDB_DATA_TYPE_MEDIUMBLOB +}; + +struct SCompressor { + int8_t type; + int8_t cmprAlg; + int8_t autoAlloc; + int32_t nVal; + uint8_t *aBuf[2]; + int64_t nBuf[2]; + union { + // Timestamp ---- + struct { + int64_t ts_prev_val; + int64_t ts_prev_delta; + uint8_t *ts_flag_p; + }; + // Integer ---- + struct { + int64_t i_prev; + int32_t i_selector; + int32_t i_start; + int32_t i_end; + uint64_t i_aZigzag[241]; + int8_t i_aBitN[241]; + }; + // Float ---- + struct { + uint32_t f_prev; + uint8_t *f_flag_p; + }; + // Double ---- + struct { + uint64_t d_prev; + uint8_t *d_flag_p; + }; + }; +}; + +// Timestamp ===================================================== +static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor->nVal) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal); + if (code) return code; + } + pCmprsor->nBuf[1] = 0; + + int64_t n = 1; + int64_t value; + int64_t delta; + uint64_t vZigzag; + while (n < pCmprsor->nBuf[0]) { + uint8_t aN[2]; + aN[0] = pCmprsor->aBuf[0][n] & 0xf; + aN[1] = pCmprsor->aBuf[0][n] >> 4; + + n++; + + for (int32_t i = 0; i < 2; i++) { + vZigzag = 0; + for (uint8_t j = 0; j < aN[i]; j++) { + vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j)); + n++; + } + + int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); + if (pCmprsor->nBuf[1] == 0) { + delta = 0; + value = delta_of_delta; + } else { + delta = delta_of_delta + delta; + value = delta + value; + } + + memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t)); + pCmprsor->nBuf[1] += sizeof(int64_t); + + if (n >= pCmprsor->nBuf[0]) break; + } + } + + ASSERT(n == pCmprsor->nBuf[0]); + + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1); + if (code) return code; + } + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]); + pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1]; + } + pCmprsor->aBuf[0][0] = 0; + + return code; +} +static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) { + int32_t code = 0; + + int64_t ts = *(int64_t *)pData; + ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); + ASSERT(nData == 8); + + if (pCmprsor->aBuf[0][0] == 1) { + if (pCmprsor->nVal == 0) { + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = -ts; + } + + if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_cmpr; + } + int64_t delta = ts - pCmprsor->ts_prev_val; + + if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { + code = tCompSetCopyMode(pCmprsor); + if (code) return code; + goto _copy_cmpr; + } + int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta); + + pCmprsor->ts_prev_val = ts; + pCmprsor->ts_prev_delta = delta; + + if ((pCmprsor->nVal & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); + if (code) return code; + } + + pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0]; + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] = 0; + while (vZigzag) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0]++; + vZigzag >>= 8; + } + } else { + while (vZigzag) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); + pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p[0] += 0x10; + vZigzag >>= 8; + } + } + } else { + _copy_cmpr: + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts)); + if (code) return code; + } + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); + pCmprsor->nBuf[0] += sizeof(ts); + } + pCmprsor->nVal++; + + return code; +} + +// Integer ===================================================== +#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL) +static const uint8_t BIT_PER_INTEGER[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; +static const int32_t SELECTOR_TO_ELEMS[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; +static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 12, + 13, 13, 13, 13, 13, 14, 14, 14, 14, 14, 14, 14, 14, 14, 14, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, + 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; + +static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) { + int32_t code = 0; + + ASSERT(nData == DATA_TYPE_INFO[pCmprsor->type].bytes); + + if (pCmprsor->aBuf[0][0] == 0) { + int64_t val; + + switch (pCmprsor->type) { + case TSDB_DATA_TYPE_TINYINT: + val = *(int8_t *)pData; + break; + case TSDB_DATA_TYPE_SMALLINT: + val = *(int16_t *)pData; + break; + case TSDB_DATA_TYPE_INT: + val = *(int32_t *)pData; + break; + case TSDB_DATA_TYPE_BIGINT: + val = *(int64_t *)pData; + break; + case TSDB_DATA_TYPE_UTINYINT: + val = *(uint8_t *)pData; + break; + case TSDB_DATA_TYPE_USMALLINT: + val = *(uint16_t *)pData; + break; + case TSDB_DATA_TYPE_UINT: + val = *(uint32_t *)pData; + break; + case TSDB_DATA_TYPE_UBIGINT: + val = *(int64_t *)pData; + break; + default: + ASSERT(0); + break; + } + + if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) { + // TODO + goto _copy_cmpr; + } + + int64_t diff = val - pCmprsor->i_prev; + uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); + if (vZigzag >= SIMPLE8B_MAX) { + // TODO + goto _copy_cmpr; + } + + int8_t nBit = (vZigzag) ? (64 - BUILDIN_CLZL(vZigzag)) : 0; + pCmprsor->i_prev = val; + + while (1) { + int32_t nEle = (pCmprsor->i_end + 241 - pCmprsor->i_start) % 241; + + if (nEle + 1 <= SELECTOR_TO_ELEMS[pCmprsor->i_selector] && nEle + 1 <= SELECTOR_TO_ELEMS[BIT_TO_SELECTOR[nBit]]) { + if (pCmprsor->i_selector < BIT_TO_SELECTOR[nBit]) { + pCmprsor->i_selector = BIT_TO_SELECTOR[nBit]; + } + pCmprsor->i_end = (pCmprsor->i_end + 1) % 241; + pCmprsor->i_aZigzag[pCmprsor->i_end] = vZigzag; + pCmprsor->i_aBitN[pCmprsor->i_end] = nBit; + break; + } else { + while (nEle < SELECTOR_TO_ELEMS[pCmprsor->i_selector]) { + pCmprsor->i_selector++; + } + nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector]; + + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + if (code) return code; + } + + uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); + pCmprsor->nBuf[0] += sizeof(uint64_t); + bp[0] = pCmprsor->i_selector; + uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector]; + for (int32_t iVal = 0; iVal < nEle; iVal++) { + bp[0] |= ((pCmprsor->i_aZigzag[pCmprsor->i_start] & ((((uint64_t)1) << bits) - 1)) << (bits * iVal + 4)); + pCmprsor->i_start = (pCmprsor->i_start + 1) % 241; + } + + // reset and continue + pCmprsor->i_selector = 0; + for (int32_t iVal = pCmprsor->i_start; iVal < pCmprsor->i_end; iVal = (iVal + 1) % 241) { + if (pCmprsor->i_selector < BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]) { + pCmprsor->i_selector = BIT_TO_SELECTOR[pCmprsor->i_aBitN[iVal]]; + } + } + } + } + } else { + _copy_cmpr: + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); + if (code) return code; + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; + } + pCmprsor->nVal++; + + return code; +} + +// Float ===================================================== +static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { + int32_t code = 0; + + ASSERT(nData == sizeof(float)); + + union { + float f; + uint32_t u; + } val = {.f = *(float *)pData}; + + uint32_t diff = val.u ^ pCmprsor->f_prev; + pCmprsor->f_prev = val.u; + + int32_t clz, ctz; + if (diff) { + clz = BUILDIN_CLZ(diff); + ctz = BUILDIN_CTZ(diff); + } else { + clz = 32; + ctz = 32; + } + + uint8_t nBytes; + if (clz < ctz) { + nBytes = sizeof(uint32_t) - ctz / BITS_PER_BYTE; + if (nBytes) diff >>= (32 - nBytes * BITS_PER_BYTE); + } else { + nBytes = sizeof(uint32_t) - clz / BITS_PER_BYTE; + } + if (nBytes == 0) nBytes++; + + if ((pCmprsor->nVal & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9); + if (code) return code; + } + + pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + + if (clz < ctz) { + pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); + } else { + pCmprsor->f_flag_p[0] = nBytes - 1; + } + } else { + if (clz < ctz) { + pCmprsor->f_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); + } else { + pCmprsor->f_flag_p[0] |= ((nBytes - 1) << 4); + } + } + for (; nBytes; nBytes--) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); + pCmprsor->nBuf[0]++; + diff >>= BITS_PER_BYTE; + } + pCmprsor->nVal++; + + return code; +} + +// Double ===================================================== +static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { + int32_t code = 0; + + ASSERT(nData == sizeof(double)); + + union { + double d; + uint64_t u; + } val = {.d = *(double *)pData}; + + uint64_t diff = val.u ^ pCmprsor->d_prev; + pCmprsor->d_prev = val.u; + + int32_t clz, ctz; + if (diff) { + clz = BUILDIN_CLZL(diff); + ctz = BUILDIN_CTZL(diff); + } else { + clz = 64; + ctz = 64; + } + + uint8_t nBytes; + if (clz < ctz) { + nBytes = sizeof(uint64_t) - ctz / BITS_PER_BYTE; + if (nBytes) diff >>= (64 - nBytes * BITS_PER_BYTE); + } else { + nBytes = sizeof(uint64_t) - clz / BITS_PER_BYTE; + } + if (nBytes == 0) nBytes++; + + if ((pCmprsor->nVal & 0x1) == 0) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); + if (code) return code; + } + + pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; + pCmprsor->nBuf[0]++; + + if (clz < ctz) { + pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); + } else { + pCmprsor->d_flag_p[0] = nBytes - 1; + } + } else { + if (clz < ctz) { + pCmprsor->d_flag_p[0] |= ((0x08 | (nBytes - 1)) << 4); + } else { + pCmprsor->d_flag_p[0] |= ((nBytes - 1) << 4); + } + } + for (; nBytes; nBytes--) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); + pCmprsor->nBuf[0]++; + diff >>= BITS_PER_BYTE; + } + pCmprsor->nVal++; + + return code; +} + +// Binary ===================================================== +static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { + int32_t code = 0; + + if (nData) { + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); + if (code) return code; + } + + memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); + pCmprsor->nBuf[0] += nData; + } + pCmprsor->nVal++; + + return code; +} + +// Bool ===================================================== +static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; + +static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) { + int32_t code = 0; + + bool vBool = *(int8_t *)pData; + + int32_t mod4 = pCmprsor->nVal & 3; + if (mod4 == 0) { + pCmprsor->nBuf[0]++; + + if (pCmprsor->autoAlloc) { + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + if (code) return code; + } + + pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] = 0; + } + if (vBool) { + pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4]; + } + pCmprsor->nVal++; + + return code; +} + +// SCompressor ===================================================== +int32_t tCompressorCreate(SCompressor **ppCmprsor) { + int32_t code = 0; + + *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); + if ((*ppCmprsor) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024); + if (code) { + taosMemoryFree(*ppCmprsor); + *ppCmprsor = NULL; + goto _exit; + } + +_exit: + return code; +} + +int32_t tCompressorDestroy(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor) { + int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); + for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { + tFree(pCmprsor->aBuf[iBuf]); + } + + taosMemoryFree(pCmprsor); + } + + return code; +} + +int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int8_t autoAlloc) { + int32_t code = 0; + + pCmprsor->type = type; + pCmprsor->cmprAlg = cmprAlg; + pCmprsor->autoAlloc = autoAlloc; + pCmprsor->nVal = 0; + + switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + pCmprsor->ts_prev_val = 0; + pCmprsor->ts_prev_delta = 0; + pCmprsor->ts_flag_p = NULL; + pCmprsor->aBuf[0][0] = 1; // For timestamp, 1 means compressed, 0 otherwise + pCmprsor->nBuf[0] = 1; + break; + case TSDB_DATA_TYPE_BOOL: + pCmprsor->nBuf[0] = 0; + break; + case TSDB_DATA_TYPE_BINARY: + pCmprsor->nBuf[0] = 0; + break; + case TSDB_DATA_TYPE_FLOAT: + pCmprsor->f_prev = 0; + pCmprsor->f_flag_p = NULL; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; + break; + case TSDB_DATA_TYPE_DOUBLE: + pCmprsor->d_prev = 0; + pCmprsor->d_flag_p = NULL; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; + break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + pCmprsor->i_prev = 0; + pCmprsor->i_selector = 0; + pCmprsor->i_start = 0; + pCmprsor->i_end = 0; + pCmprsor->aBuf[0][0] = 0; // 0 means compressed, 1 otherwise (for backward compatibility) + pCmprsor->nBuf[0] = 1; + break; + default: + break; + } + + return code; +} + +int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { + int32_t code = 0; + + if (pCmprsor->nVal == 0) { + *ppData = NULL; + *nData = 0; + return code; + } + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP /*|| IS_VAR_DATA_TYPE(pCmprsor->type)*/) { + code = tRealloc(&pCmprsor->aBuf[1], pCmprsor->nBuf[0] + 1); + if (code) return code; + + int64_t ret = LZ4_compress_default(pCmprsor->aBuf[0], pCmprsor->aBuf[1] + 1, pCmprsor->nBuf[0], pCmprsor->nBuf[0]); + if (ret) { + pCmprsor->aBuf[1][0] = 0; + pCmprsor->nBuf[1] = ret + 1; + } else { + pCmprsor->aBuf[1][0] = 1; + memcpy(pCmprsor->aBuf[1] + 1, pCmprsor->aBuf[0], pCmprsor->nBuf[0]); + pCmprsor->nBuf[1] = pCmprsor->nBuf[0] + 1; + } + + *ppData = pCmprsor->aBuf[1]; + *nData = pCmprsor->nBuf[1]; + } else { + *ppData = pCmprsor->aBuf[0]; + *nData = pCmprsor->nBuf[0]; + } + + return code; +} + +int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) { + return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData); +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 50c42ff170..1906a77127 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") diff --git a/tests/script/sh/gpd.c b/tests/script/sh/gpd.c new file mode 100644 index 0000000000..8d69bacb5e --- /dev/null +++ b/tests/script/sh/gpd.c @@ -0,0 +1,74 @@ +#include +#include +#include +#ifdef LINUX +#include +#endif +#ifdef WINDOWS +#include +#endif +#include "taosudf.h" + +TAOS* taos = NULL; + +DLL_EXPORT int32_t gpd_init() { + taos = taos_connect("localhost", "root", "taosdata", "", 7100); + return 0; +} + +DLL_EXPORT int32_t gpd_destroy() { + taos_close(taos); + taos_cleanup(); + return 0; +} + +DLL_EXPORT int32_t gpd(SUdfDataBlock* block, SUdfColumn *resultCol) { + SUdfColumnMeta *meta = &resultCol->colMeta; + meta->bytes = 4; + meta->type = TSDB_DATA_TYPE_INT; + meta->scale = 0; + meta->precision = 0; + + SUdfColumnData *resultData = &resultCol->colData; + resultData->numOfRows = block->numOfRows; + for (int32_t i = 0; i < resultData->numOfRows; ++i) { + int j = 0; + for (; j < block->numOfCols; ++j) { + if (udfColDataIsNull(block->udfCols[j], i)) { + udfColDataSetNull(resultCol, i); + break; + } + } + if ( j == block->numOfCols) { + int32_t luckyNum = 88; + udfColDataSet(resultCol, i, (char *)&luckyNum, false); + } + } + TAOS_RES* res = taos_query(taos, "create database if not exists gpd"); + if (taos_errno(res) != 0) { + char* errstr = taos_errstr(res); + } + res = taos_query(taos, "create table gpd.st (ts timestamp, f int) tags(t int)"); + if (taos_errno(res) != 0) { + char* errstr = taos_errstr(res); + } + + taos_query(taos, "insert into gpd.t using gpd.st tags(1) values(now, 1) "); + if (taos_errno(res) != 0) { + char* errstr = taos_errstr(res); + } + + taos_query(taos, "select * from gpd.t"); + if (taos_errno(res) != 0) { + char* errstr = taos_errstr(res); + } + + //to simulate actual processing delay by udf +#ifdef LINUX + usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +#ifdef WINDOWS + Sleep(1); +#endif + return 0; +} diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index 7f8b1044ef..0b48a815e2 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -144,18 +144,18 @@ if $data20 != 8.000000000 then return -1 endi -sql drop function bit_and; -sql show functions; -if $rows != 1 then - return -1 -endi -if $data00 != @l2norm@ then - return -1 - endi -sql drop function l2norm; -sql show functions; -if $rows != 0 then - return -1 -endi +#sql drop function bit_and; +#sql show functions; +#if $rows != 1 then +# return -1 +#endi +#if $data00 != @l2norm@ then +# return -1 +# endi +#sql drop function l2norm; +#sql show functions; +#if $rows != 0 then +# return -1 +#endi system sh/exec.sh -n dnode1 -s stop -x SIGINT