diff --git a/cmake/cmake.options b/cmake/cmake.options index b00ae14715..555b72cbdf 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -80,7 +80,7 @@ ENDIF () option( BUILD_GEOS "If build geos on Windows" - OFF + ON ) option( diff --git a/cmake/cmake.platform b/cmake/cmake.platform index f9faf7316c..ba747c6134 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -56,17 +56,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin SET(TD_DARWIN TRUE) SET(OSTYPE "macOS") - execute_process(COMMAND geos-config --cflags OUTPUT_VARIABLE GEOS_CFLAGS) - execute_process(COMMAND geos-config --ldflags OUTPUT_VARIABLE GEOS_LDFLAGS) - string(SUBSTRING ${GEOS_CFLAGS} 2 -1 GEOS_CFLAGS) - string(REGEX REPLACE "\n" "" GEOS_CFLAGS ${GEOS_CFLAGS}) - string(SUBSTRING ${GEOS_LDFLAGS} 2 -1 GEOS_LDFLAGS) - string(REGEX REPLACE "\n" "" GEOS_LDFLAGS ${GEOS_LDFLAGS}) - MESSAGE("GEOS_CFLAGS "${GEOS_CFLAGS}) - MESSAGE("GEOS_LDFLAGS "${GEOS_LDFLAGS}) ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare") - INCLUDE_DIRECTORIES(${GEOS_CFLAGS}) - LINK_DIRECTORIES(${GEOS_LDFLAGS}) IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64") MESSAGE("Current system arch is arm64") diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b4674aba54..d78e771fcf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2807,39 +2807,49 @@ typedef struct { int64_t suid; } SMqRebVgReq; -static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->leftForVer); - tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); - tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); - tlen += taosEncodeString(buf, pReq->subKey); - tlen += taosEncodeFixedI8(buf, pReq->subType); - tlen += taosEncodeFixedI8(buf, pReq->withMeta); +static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1; + if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1; + if (tEncodeI8(pCoder, pReq->subType) < 0) return -1; + if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1; + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - tlen += taosEncodeString(buf, pReq->qmsg); + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - tlen += taosEncodeFixedI64(buf, pReq->suid); - tlen += taosEncodeString(buf, pReq->qmsg); + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1; } - return tlen; + tEndEncode(pCoder); + return 0; } -static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->leftForVer); - buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); - buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); - buf = taosDecodeStringTo(buf, pReq->subKey); - buf = taosDecodeFixedI8(buf, &pReq->subType); - buf = taosDecodeFixedI8(buf, &pReq->withMeta); +static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder *pCoder, SMqRebVgReq* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1; + + if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1; + if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1; + if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { - buf = taosDecodeString(buf, &pReq->qmsg); + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { - buf = taosDecodeFixedI64(buf, &pReq->suid); - buf = taosDecodeString(buf, &pReq->qmsg); + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (!tDecodeIsEnd(pCoder)){ + if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1; + } } - return (void*)buf; + + tEndDecode(pCoder); + return 0; } typedef struct { diff --git a/packaging/deb/DEBIAN/preinst b/packaging/deb/DEBIAN/preinst index 904a946e20..d6558d5b3b 100644 --- a/packaging/deb/DEBIAN/preinst +++ b/packaging/deb/DEBIAN/preinst @@ -80,4 +80,5 @@ fi # there can not libtaos.so*, otherwise ln -s error ${csudo}rm -f ${install_main_dir}/driver/libtaos.* || : +[ -f ${install_main_dir}/driver/librocksdb.* ] && ${csudo}rm -f ${install_main_dir}/driver/librocksdb.* || : [ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}rm -f ${install_main_dir}/driver/libtaosws.so || : diff --git a/packaging/deb/DEBIAN/prerm b/packaging/deb/DEBIAN/prerm index 0d63115a04..8f8d472867 100644 --- a/packaging/deb/DEBIAN/prerm +++ b/packaging/deb/DEBIAN/prerm @@ -40,6 +40,7 @@ else ${csudo}rm -f ${inc_link_dir}/taosudf.h || : [ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || : + [ -f ${lib_link_dir}/librocksdb.* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : [ -f ${lib_link_dir}/libtaosws.so ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.so || : ${csudo}rm -f ${log_link_dir} || : diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index 9f49cf345a..024c69deb1 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -31,6 +31,7 @@ cd ${pkg_dir} libfile="libtaos.so.${tdengine_ver}" wslibfile="libtaosws.so" +rocksdblib="librocksdb.so.8" # create install dir install_home_path="/usr/local/taos" @@ -94,6 +95,7 @@ fi cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver +[ -f ${compile_dir}/build/lib/${rocksdblib} ] && cp ${compile_dir}/build/lib/${rocksdblib} ${pkg_dir}${install_home_path}/driver ||: [ -f ${compile_dir}/build/lib/${wslibfile} ] && cp ${compile_dir}/build/lib/${wslibfile} ${pkg_dir}${install_home_path}/driver ||: cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index 52d5335003..2b056c376a 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -45,6 +45,7 @@ echo buildroot: %{buildroot} libfile="libtaos.so.%{_version}" wslibfile="libtaosws.so" +rocksdblib="librocksdb.so.8" # create install path, and cp file mkdir -p %{buildroot}%{homepath}/bin @@ -92,6 +93,7 @@ if [ -f %{_compiledir}/build/bin/taosadapter ]; then fi cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver [ -f %{_compiledir}/build/lib/${wslibfile} ] && cp %{_compiledir}/build/lib/${wslibfile} %{buildroot}%{homepath}/driver ||: +[ -f %{_compiledir}/build/lib/${rocksdblib} ] && cp %{_compiledir}/build/lib/${rocksdblib} %{buildroot}%{homepath}/driver ||: cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include @@ -174,6 +176,7 @@ fi # there can not libtaos.so*, otherwise ln -s error ${csudo}rm -f %{homepath}/driver/libtaos* || : +${csudo}rm -f %{homepath}/driver/librocksdb* || : #Scripts executed after installation %post @@ -219,6 +222,7 @@ if [ $1 -eq 0 ];then ${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taosudf.h || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || : + ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : ${csudo}rm -f ${log_link_dir} || : ${csudo}rm -f ${data_link_dir} || : diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 1b47b10520..9452e48d33 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -250,18 +250,27 @@ function install_lib() { # Remove links ${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || : + ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : + ${csudo}rm -f ${lib64_link_dir}/librocksdb.* || : #${csudo}rm -rf ${v15_java_app_dir} || : ${csudo}cp -rf ${script_dir}/driver/* ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/* ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so + ${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8 + ${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so + + [ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || : if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || : ${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || : + ${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib64_link_dir}/librocksdb.so.8 || : + ${csudo}ln -sf ${lib64_link_dir}/librocksdb.so.8 ${lib64_link_dir}/librocksdb.so || : + [ -f ${install_main_dir}/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || : fi diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index b0537e8bcf..ab45c684c4 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -111,9 +111,11 @@ fi if [ "$osType" == "Darwin" ]; then lib_files="${build_dir}/lib/libtaos.${version}.dylib" wslib_files="${build_dir}/lib/libtaosws.dylib" + rocksdb_lib_files="${build_dir}/lib/librocksdb.dylib.8.1.1" else lib_files="${build_dir}/lib/libtaos.so.${version}" wslib_files="${build_dir}/lib/libtaosws.so" + rocksdb_lib_files="${build_dir}/lib/librocksdb.so.8.1.1" fi header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h" @@ -336,6 +338,7 @@ fi # Copy driver mkdir -p ${install_dir}/driver && cp ${lib_files} ${install_dir}/driver && echo "${versionComp}" >${install_dir}/driver/vercomp.txt [ -f ${wslib_files} ] && cp ${wslib_files} ${install_dir}/driver || : +[ -f ${rocksdb_lib_files} ] && cp ${rocksdb_lib_files} ${install_dir}/driver || : # Copy connector if [ "$verMode" == "cluster" ]; then diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh index fc392c9684..4a8dda1d30 100755 --- a/packaging/tools/post.sh +++ b/packaging/tools/post.sh @@ -202,10 +202,15 @@ function install_lib() { log_print "start install lib from ${lib_dir} to ${lib_link_dir}" ${csudo}rm -f ${lib_link_dir}/libtaos* || : ${csudo}rm -f ${lib64_link_dir}/libtaos* || : + + #rocksdb + [ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || : + [ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || : [ -f ${lib_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.${lib_file_ext} || : [ -f ${lib64_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.${lib_file_ext} || : + ${csudo}ln -s ${lib_dir}/librocksdb.* ${lib_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_link_dir}/libtaos.${lib_file_ext_1} ${lib_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1 @@ -214,6 +219,7 @@ function install_lib() { if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.${lib_file_ext} ]]; then ${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib64_link_dir}/libtaos.${lib_file_ext_1} ${lib64_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1 + ${csudo}ln -s ${lib_dir}/librocksdb.* ${lib64_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1 [ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib64_link_dir}/libtaosws.${lib_file_ext} 2>>${install_log_path} fi diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index 6c671473bf..a17b29983c 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -142,11 +142,14 @@ function clean_local_bin() { function clean_lib() { # Remove link ${csudo}rm -f ${lib_link_dir}/libtaos.* || : + ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : [ -f ${lib_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || : + ${csudo}rm -f ${lib64_link_dir}/librocksdb.* || : [ -f ${lib64_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.* || : #${csudo}rm -rf ${v15_java_app_dir} || : + } function clean_header() { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ccb09e3584..e1b2b9c48b 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1868,7 +1868,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // update the local offset value only for the returned values, only when the local offset is NOT updated // by tmq_offset_seek function if (!pVg->seekUpdated) { + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; + } else { + tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); } // update the status @@ -1952,8 +1955,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate - pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; + // update the local offset value only for the returned values, only when the local offset is NOT updated + // by tmq_offset_seek function + if (!pVg->seekUpdated) { + if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); + pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; + } + } else { + tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e62102fa77..74421afa33 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -111,7 +111,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.suid = pSub->stbUid; tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); + int32_t tlen = 0; + int32_t ret = 0; + tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); + if (ret < 0) { + return -1; + } + + tlen += sizeof(SMsgHead); void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -123,8 +130,14 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri pMsgHead->contLen = htonl(tlen); pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqRebVgReq(&abuf, &req); + SEncoder encoder = {0}; + tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen); + if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { + taosMemoryFreeClear(buf); + tEncoderClear(&encoder); + return -1; + } + tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f30fa03d62..aa6bbbe9df 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -23,8 +23,8 @@ static int32_t tqInitialize(STQ* pTq); static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } -static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;} -static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;} +static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } +static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } int32_t tqInit() { int8_t old; @@ -78,7 +78,7 @@ static void destroyTqHandle(void* data) { taosMemoryFreeClear(pData->execHandle.execTb.qmsg); nodesDestroyNode(pData->execHandle.execTb.node); } - if(pData->msg != NULL) { + if (pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); taosMemoryFree(pData->msg); pData->msg = NULL; @@ -240,14 +240,15 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever); + tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, + ever); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, + dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -263,8 +264,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, - vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, + pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -336,8 +337,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); if (pHandle == NULL) { - tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, - pOffset->subKey); + tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -353,7 +353,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) } taosRUnLockLatch(&pTq->lock); - //3. check the offset info + // 3. check the offset info STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL) { if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { @@ -381,7 +381,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); } else { - tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version); + tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); } if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { @@ -423,7 +423,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; - int code = 0; + int code = 0; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); terrno = TSDB_CODE_INVALID_MSG; @@ -449,7 +449,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 2. check re-balance status if (pHandle->consumerId != consumerId) { - tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + tqError("ERROR tmq poll: consumer:0x%" PRIx64 + " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); @@ -457,22 +458,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } bool exec = tqIsHandleExec(pHandle); - if(!exec) { + if (!exec) { tqSetHandleExec(pHandle); -// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, + req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; } taosWUnLockLatch(&pTq->lock); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 + "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", + consumerId, vgId, req.subKey, pHandle); taosMsleep(10); } // 3. update the epoch value if (pHandle->epoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, + reqEpoch); pHandle->epoch = reqEpoch; } @@ -484,7 +489,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, + req.subKey, pHandle); return code; } @@ -548,7 +554,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly + if (currentVer == -1) { // not start to read data from wal yet, return req offset directly dataRsp.rspOffset.version = reqOffset.version; } else { dataRsp.rspOffset.version = currentVer; // return current consume offset value @@ -572,7 +578,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; @@ -581,7 +587,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + pHandle->subKey, pHandle); taosMsleep(10); } @@ -641,9 +648,18 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int ret = 0; + int ret = 0; SMqRebVgReq req = {0}; - tDecodeSMqRebVgReq(msg, &req); + SDecoder dc = {0}; + + tDecoderInit(&dc, msg, msgLen); + + // decode req + if (tDecodeSMqRebVgReq(&dc, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; + } SVnode* pVnode = pTq->pVnode; int32_t vgId = TD_VID(pVnode); @@ -689,8 +705,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = req.qmsg; - req.qmsg = NULL; + pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg); pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); @@ -710,10 +725,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; - pHandle->execHandle.execTb.qmsg = req.qmsg; - req.qmsg = NULL; + pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); - if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { + if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), pVnode->config.vgId, req.subKey, pHandle->consumerId); @@ -727,45 +741,47 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SArray* tbUidList = NULL; ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); - if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); + if (ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, + pHandle->consumerId); taosArrayDestroy(tbUidList); goto end; } - tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); + tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, + pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL); taosArrayDestroy(tbUidList); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, - pHandle->consumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { taosWLockLatch(&pTq->lock); if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId); + tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, + req.newConsumerId); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); } -// atomic_add_fetch_32(&pHandle->epoch, 1); + // atomic_add_fetch_32(&pHandle->epoch, 1); // kill executing task -// if(tqIsHandleExec(pHandle)) { -// qTaskInfo_t pTaskInfo = pHandle->execHandle.task; -// if (pTaskInfo != NULL) { -// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); -// } + // if(tqIsHandleExec(pHandle)) { + // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + // if (pTaskInfo != NULL) { + // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + // } -// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { -// qStreamCloseTsdbReader(pTaskInfo); -// } -// } + // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + // qStreamCloseTsdbReader(pTaskInfo); + // } + // } // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); taosWUnLockLatch(&pTq->lock); @@ -773,13 +789,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } end: - taosMemoryFree(req.qmsg); + tDecoderClear(&dc); return ret; } -void freePtr(void *ptr) { - taosMemoryFree(*(void**)ptr); -} +void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); @@ -802,7 +816,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->chkInfo.currentVer = ver; // expand executor - pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; + pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -868,8 +882,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, - pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, + pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); // next valid version will add one pTask->chkInfo.version += 1; @@ -982,7 +996,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, + (int32_t)sizeof(SStreamTask)); return -1; } @@ -1097,7 +1112,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t // do recovery step 2 int64_t st = taosGetTimestampMs(); - tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st); code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { @@ -1106,6 +1121,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion); + walReaderSeekVer(pTask->exec.pWalReader, sversion); pTask->chkInfo.currentVer = sversion; @@ -1129,7 +1145,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - double el = (taosGetTimestampMs() - st)/ 1000.0; + double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task @@ -1245,8 +1261,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, - pTask->id.idStr, pTask->chkInfo.version); + tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.version); streamProcessRunReq(pTask); } else { if (streamTaskShouldPause(&pTask->status)) { @@ -1265,9 +1281,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamDispatchReq req = {0}; @@ -1313,7 +1329,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 79391bc1c5..ffc63a22a8 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -456,6 +456,7 @@ typedef struct SStreamIntervalOperatorInfo { SSHashObj* pUpdatedMap; int64_t dataVersion; SStateStore statestore; + bool recvGetAll; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 66014ff12f..c93b9f4c73 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #include "executorInt.h" #include "filter.h" #include "function.h" @@ -2454,6 +2456,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys int32_t len = 0; pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len); streamScanOperatorDecode(buff, len, pInfo); + taosMemoryFree(buff); } setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, @@ -3468,3 +3471,5 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); } + +// clang-format on diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fc08f827a5..3a83472079 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2439,6 +2439,15 @@ static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { return 0; } +static void resetUnCloseWinInfo(SSHashObj* winMap) { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) { + SRowBuffPos* pPos = *(SRowBuffPos**)pIte; + pPos->beUsed = true; + } +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2472,6 +2481,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable); + } + setOperatorCompleted(pOperator); if (!IS_FINAL_OP(pInfo)) { clearFunctionContext(&pOperator->exprSupp); @@ -2565,6 +2579,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { + pInfo->recvGetAll = true; getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { @@ -2773,6 +2788,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; pInfo->statestore = pTaskInfo->storageAPI.stateStore; + pInfo->recvGetAll = false; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -4751,6 +4767,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; } + + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable); + } + setOperatorCompleted(pOperator); if (pInfo->twAggSup.maxTs > 0 && pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { @@ -4790,6 +4812,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_GET_ALL) { qDebug("===stream===single interval recv|block type STREAM_GET_ALL"); + pInfo->recvGetAll = true; getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -4960,6 +4983,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); pInfo->statestore = pTaskInfo->storageAPI.stateStore; + pInfo->recvGetAll = false; + initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c743ecf7f7..b3995f020b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -87,8 +87,6 @@ void* streamBackendInit(const char* path) { pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - rocksdb_env_set_low_priority_background_threads(env, 4); - rocksdb_env_set_high_priority_background_threads(env, 2); rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); @@ -574,9 +572,14 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { *dest = NULL; return -1; } - int64_t now = taosGetTimestampMs(); p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); + if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { + if (dest != NULL) *dest = NULL; + qError("vlen: %d, read len: %d", vlen, key.len); + return -1; + } + if (key.len == 0) { key.data = NULL; } else { @@ -584,6 +587,7 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { } if (ttl != NULL) { + int64_t now = taosGetTimestampMs(); *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; } if (dest != NULL) { @@ -1005,35 +1009,35 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = \ - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - qTrace("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ - } \ - taosMemoryFree(ttlV); \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + code = -1; \ + } else { \ + qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -1056,7 +1060,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ size_t len = 0; \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ + if (val == NULL || len == 0) { \ if (err == NULL) { \ qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ funcname); \ @@ -1068,17 +1072,17 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = -1; \ } else { \ char* p = NULL; \ - int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (len < 0) { \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ funcname); \ code = -1; \ } else { \ qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ - len); \ + tlen); \ } \ taosMemoryFree(val); \ - if (vLen != NULL) *vLen = len; \ + if (vLen != NULL) *vLen = tlen; \ } \ if (code == 0) \ qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ @@ -1924,17 +1928,17 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); + STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); + STREAM_STATE_DEL_ROCKSDB(pState, "default", key); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c28cd6f334..46290c306f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,7 +20,7 @@ #define MIN_STREAM_EXEC_BATCH_NUM 4 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 -static int32_t updateCheckPointInfo (SStreamTask* pTask); +static int32_t updateCheckPointInfo(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -49,10 +49,11 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return -1; } - qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, + size / 1048576.0); code = streamTaskOutputResultBlock(pTask, pStreamBlocks); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position destroyStreamDataBlock(pStreamBlocks); return -1; } @@ -66,7 +67,8 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return TSDB_CODE_SUCCESS; } -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, + int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -83,7 +85,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i } if (streamTaskShouldStop(&pTask->status)) { - taosArrayDestroy(pRes); // memory leak + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -100,9 +102,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; - - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; + SSDataBlock block = {0}; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); @@ -154,7 +155,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); } else { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } return code; @@ -287,7 +288,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -int32_t updateCheckPointInfo (SStreamTask* pTask) { +int32_t updateCheckPointInfo(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); @@ -295,7 +296,8 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { SCheckpointInfo* pCkInfo = &pTask->chkInfo; if (ckId > pCkInfo->id) { // save it since the checkpoint is updated qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); + ", checkPoint id:%" PRId64 " -> %" PRId64, + pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index fff666ec9f..85be120dbd 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -20,9 +20,9 @@ #include "ttime.h" #define DEFAULT_FALSE_POSITIVE 0.01 -#define DEFAULT_BUCKET_SIZE 1310720 -#define DEFAULT_MAP_CAPACITY 1310720 -#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10) +#define DEFAULT_BUCKET_SIZE 131072 +#define DEFAULT_MAP_CAPACITY 131072 +#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define ROWS_PER_MILLISECOND 1 #define MAX_NUM_SCALABLE_BF 100000 #define MIN_NUM_SCALABLE_BF 10 @@ -44,8 +44,8 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { } } -static void clearItemHelper(void* p) { - SScalableBf** pBf = p; +static void clearItemHelper(void *p) { + SScalableBf **pBf = p; tScalableBfDestroy(*pBf); } @@ -274,7 +274,7 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { } int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) { - if(!pInfo) { + if (!pInfo) { return 0; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f531f65565..bc84509728 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -16,12 +16,12 @@ #include "tstreamFileState.h" #include "query.h" +#include "storageapi.h" #include "streamBackendRocksdb.h" #include "taos.h" #include "tcommon.h" #include "thash.h" #include "tsimplehash.h" -#include "storageapi.h" #define FLUSH_RATIO 0.5 #define FLUSH_NUM 4 @@ -137,7 +137,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { SListNode* pNode = NULL; while ((pNode = tdListNext(&iter)) != NULL) { SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); - if (all || (pFileState->getTs(pPos->pKey) < ts)) { + if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) { ASSERT(pPos->pRowBuff != NULL); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); pPos->pRowBuff = NULL; @@ -416,10 +416,13 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t len = 0; memcpy(buf, taskKey, strlen(taskKey)); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); - if (code != 0) { + if (code != 0 || len == 0 || val == NULL) { return TSDB_CODE_FAILED; } - sscanf(val, "%" PRId64 "", &maxCheckPointId); + memcpy(val, buf, len); + buf[len] = 0; + maxCheckPointId = atol((char*)buf); + taosMemoryFree(val); } for (int64_t i = maxCheckPointId; i > 0; i--) { char buf[128] = {0}; @@ -430,13 +433,16 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0) { return TSDB_CODE_FAILED; } + memcpy(val, buf, len); + buf[len] = 0; + taosMemoryFree(val); + TSKEY ts; - sscanf(val, "%" PRId64 "", &ts); + ts = atol((char*)buf); if (ts < mark) { // statekey winkey.ts < mark forceRemoveCheckpoint(pFileState, i); break; - } else { } } return code; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 74b0dddbd9..c56852b88c 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -564,7 +564,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py -#,,n,system-test,python3 ./test.py -f 0-others/compatibility.py +,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py diff --git a/tests/system-test/1-insert/manyVgroups.json b/tests/system-test/1-insert/manyVgroups.json index 3b0fa96b08..8c6f39cf96 100644 --- a/tests/system-test/1-insert/manyVgroups.json +++ b/tests/system-test/1-insert/manyVgroups.json @@ -11,7 +11,7 @@ "confirm_parameter_prompt": "no", "insert_interval": 0, "interlace_rows": 0, - "num_of_records_per_req": 100000, + "num_of_records_per_req": 10000, "databases": [ { "dbinfo": { @@ -73,4 +73,4 @@ ] } ] -} \ No newline at end of file +}