diff --git a/docs/en/07-develop/01-connect.md b/docs/en/07-develop/01-connect.md index 97e96f98a4..66bd9b6b16 100644 --- a/docs/en/07-develop/01-connect.md +++ b/docs/en/07-develop/01-connect.md @@ -164,9 +164,6 @@ If you are using Maven to manage your project, simply add the following dependen pip3 install taospy[ws] ``` - - - - **Installation Verification** @@ -199,8 +196,8 @@ import taosws + - Edit `go.mod` to add the `driver-go` dependency. diff --git a/packaging/delete_ref_lock.py b/packaging/delete_ref_lock.py index cf0e4cdd05..7200246a8a 100644 --- a/packaging/delete_ref_lock.py +++ b/packaging/delete_ref_lock.py @@ -1,59 +1,80 @@ import subprocess import re -# 执行 git fetch 命令并捕获输出 def git_fetch(): result = subprocess.run(['git', 'fetch'], capture_output=True, text=True) return result -# 解析分支名称 +def git_prune(): + # git remote prune origin + print("git remote prune origin") + result = subprocess.run(['git', 'remote', 'prune', 'origin'], capture_output=True, text=True) + return result + def parse_branch_name_type1(error_output): - # 使用正则表达式匹配 'is at' 前的分支名称 + # error: cannot lock ref 'refs/remotes/origin/fix/3.0/TD-32817': is at 7af5 but expected eaba + # match the branch name before ‘is at’ with a regular expression match = re.search(r"error: cannot lock ref '(refs/remotes/origin/[^']+)': is at", error_output) if match: return match.group(1) return None -# 解析第二种错误中的分支名称 def parse_branch_name_type2(error_output): - # 使用正则表达式匹配 'exists' 前的第一个引号内的分支名称 + # match the branch name before ‘exists; cannot create’ with a regular expression match = re.search(r"'(refs/remotes/origin/[^']+)' exists;", error_output) if match: return match.group(1) return None -# 执行 git update-ref -d 命令 +# parse branch name from error output of git remote prune origin +def parse_branch_name_type3(error_output): + # match the branch name before the first single quote before 'Unable to' with a regular expression + # git error: could not delete references: cannot lock ref 'refs/remotes/origin/test/3.0/TS-4893': Unable to create 'D:/workspace/main/TDinternal/community/.git/refs/remotes/origin/test/3.0/TS-4893.lock': File exists + match = re.search(r"references: cannot lock ref '(refs/remotes/origin/[^']+)': Unable to", error_output) + if match: + return match.group(1) + return None + + +# execute git update-ref -d to delete the ref def git_update_ref(branch_name): if branch_name: subprocess.run(['git', 'update-ref', '-d', f'{branch_name}'], check=True) -# 解析错误类型并执行相应的修复操作 +# parse error type and execute corresponding repair operation def handle_error(error_output): - # 错误类型1:本地引用的提交ID与远程不一致 - if "is at" in error_output and "but expected" in error_output: - branch_name = parse_branch_name_type1(error_output) - if branch_name: - print(f"Detected error type 1, attempting to delete ref for branch: {branch_name}") - git_update_ref(branch_name) - else: - print("Error parsing branch name for type 1.") - # 错误类型2:尝试创建新的远程引用时,本地已经存在同名的引用 - elif "exists; cannot create" in error_output: - branch_name = parse_branch_name_type2(error_output) - if branch_name: - print(f"Detected error type 2, attempting to delete ref for branch: {branch_name}") - git_update_ref(branch_name) - else: - print("Error parsing branch name for type 2.") + error_types = [ + ("is at", "but expected", parse_branch_name_type1, "type 1"), + ("exists; cannot create", None, parse_branch_name_type2, "type 2"), + ("Unable to create", "File exists", parse_branch_name_type3, "type 3") + ] + + for error_type in error_types: + if error_type[0] in error_output and (error_type[1] is None or error_type[1] in error_output): + branch_name = error_type[2](error_output) + if branch_name: + print(f"Detected error {error_type[3]}, attempting to delete ref for branch: {branch_name}") + git_update_ref(branch_name) + else: + print(f"Error parsing branch name for {error_type[3]}.") + break -# 主函数 def main(): fetch_result = git_fetch() - if fetch_result.returncode != 0: # 如果 git fetch 命令失败 + if fetch_result.returncode != 0: error_output = fetch_result.stderr handle_error(error_output) else: print("Git fetch successful.") + prune_result = git_prune() + print(prune_result.returncode) + if prune_result.returncode != 0: + error_output = prune_result.stderr + print(error_output) + handle_error(error_output) + else: + print("Git prune successful.") + if __name__ == "__main__": main() \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5d41e1506c..0a107518df 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1931,11 +1931,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno = code; } - - code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); - if (code) { - mError("failed to register trans, transId:%d, and continue", pTrans->id); - } } if (!includeAllNodes) { @@ -1951,6 +1946,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); + // NOTE: for each stream, we register one trans entry for task update + code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); + if (code) { + mError("failed to register trans, transId:%d, and continue", pTrans->id); + } + code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 905a73ad48..a1e104aeca 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -35,7 +35,11 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); - int32_t num = 0; + int32_t numOfChkpt = 0; + + if (pNumOfActiveChkpt != NULL) { + *pNumOfActiveChkpt = 0; + } if (pList == NULL) { return terrno; @@ -50,15 +54,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) void *pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; - mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, - pEntry->startTime); + mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId, + pEntry->streamId, pEntry->name, pEntry->startTime); void* p = taosArrayPush(pList, &info); if (p == NULL) { return terrno; } } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { - num++; + numOfChkpt++; } mndReleaseTrans(pMnode, pTrans); } @@ -78,48 +82,34 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } } - mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size, - taosHashGetSize(execInfo.transMgmt.pDBTrans), num); + mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size, + taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt); taosArrayDestroy(pList); if (pNumOfActiveChkpt != NULL) { - *pNumOfActiveChkpt = num; + *pNumOfActiveChkpt = numOfChkpt; } return 0; } -// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. -// For a given stream: -// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. -// 2. create/drop/reset/update trans are conflict with any other trans. -int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { - if (lock) { - streamMutexLock(&execInfo.lock); - } - +static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) { int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { - if (lock) { - streamMutexUnlock(&execInfo.lock); - } return 0; } + // if any task updates exist, any other stream trans are not allowed to be created int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); if (code) { - mError("failed to clear finish trans, code:%s", tstrerror(code)); + mError("failed to clear finish trans, code:%s, and continue", tstrerror(code)); } SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; - if (lock) { - streamMutexUnlock(&execInfo.lock); - } - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { @@ -141,11 +131,25 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); } + return TSDB_CODE_SUCCESS; +} + +// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. +// For a given stream: +// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. +// 2. create/drop/reset/update trans are conflict with any other trans. +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { + if (lock) { + streamMutexLock(&execInfo.lock); + } + + int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName); + if (lock) { streamMutexUnlock(&execInfo.lock); } - return 0; + return code; } int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 80c04a3276..bcacac20a2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -254,11 +254,12 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask } static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { + streamMetaWLock(pMeta); + int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId); if (code != 0) { smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code)); } - streamMetaWLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); if (streamMetaCommit(pMeta) < 0) { // persist to disk diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bd78f62cae..937bc6e268 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1108,91 +1108,76 @@ _OVER: return code; } +// always return success to mnode +//todo: handle failure of build and send msg to mnode +static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code, + int32_t taskId) { + SRpcMsg rsp = {0}; + int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code); + if (ret) { // suppress the error in build checkpoint source rsp + tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret)); + } + tmsgSendRsp(&rsp); // error occurs +} + // no matter what kinds of error happened, make sure the mnode will receive the success execution code. int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + SStreamCheckpointSourceReq req = {0}; + SDecoder decoder = {0}; + SStreamTask* pTask = NULL; + int64_t checkpointId = 0; // disable auto rsp to mnode pRsp->info.handle = NULL; - SStreamCheckpointSourceReq req = {0}; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; // always return success to mnode, } + tDecoderClear(&decoder); if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; // always return success to mnode } if (!pTq->pVnode->restored) { tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64 ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; // always return success to mnode } - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || code != 0) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64 " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - tmsgSendRsp(&rsp); // error occurs + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; } if (pTask->status.downstreamReady != 1) { - streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); // record the latest failed checkpoint id + // record the latest failed checkpoint id + streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64 ", transId:%d set it failed", pTask->id.idStr, req.checkpointId, req.transId); + streamMetaReleaseTask(pMeta, pTask); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; // todo retry handle error } @@ -1207,14 +1192,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; } } else { @@ -1226,7 +1204,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { - int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId); tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 @@ -1235,7 +1212,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId); return TSDB_CODE_SUCCESS; } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) { @@ -1245,15 +1222,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; } } @@ -1264,7 +1233,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (code) { qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); - return code; + streamMetaReleaseTask(pMeta, pTask); + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; } if (req.mndTrigger) { @@ -1279,13 +1250,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + streamTaskSetCheckpointFailed(pTask); // set the checkpoint failed + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index be41f7e99e..51ef02de56 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -13,9 +13,7 @@ * along with this program. If not, see . */ -#include #include "tcommon.h" -#include "tmsg.h" #include "tq.h" #define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1)) @@ -50,7 +48,7 @@ static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkI static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo); static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id); static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode); -static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); +static void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs); @@ -1062,7 +1060,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } - reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs); + rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs); } } @@ -1165,7 +1163,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* return TSDB_CODE_SUCCESS; } -void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) { +void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) { int32_t code = 0; const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 29372c5da7..bc7e2e28e3 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -17,19 +17,20 @@ #include "vnd.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 100 +#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan typedef struct SBuildScanWalMsgParam { int64_t metaId; int32_t numOfTasks; } SBuildScanWalMsgParam; -static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); +static int32_t doScanWalAsync(STQ* pTq, bool ckPause); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -37,12 +38,11 @@ int32_t tqScanWal(STQ* pTq) { int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); int32_t numOfTasks = 0; - bool shouldIdle = true; tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); // check all tasks - int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); + int32_t code = doScanWalForAllTasks(pMeta); if (code) { tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); return code; @@ -133,10 +133,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { } int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { - int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; bool alreadyRestored = pTq->pVnode->restored; - int32_t numOfTasks = 0; + int32_t code = 0; // do not launch the stream tasks, if it is a follower or not restored vnode. if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { @@ -144,47 +143,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { } streamMetaWLock(pMeta); - - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - streamMetaWUnLock(pMeta); - return 0; - } - - if (pMeta->startInfo.startAllTasks) { - tqTrace("vgId:%d in restart procedure, not scan wal", vgId); - streamMetaWUnLock(pMeta); - return 0; - } - - pMeta->scanInfo.scanCounter += 1; - if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { - pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; - } - - if (pMeta->scanInfo.scanCounter > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); - streamMetaWUnLock(pMeta); - return 0; - } - - int32_t numOfPauseTasks = pMeta->numOfPausedTasks; - if (ckPause && numOfTasks == numOfPauseTasks) { - tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); - - // reset the counter value, since we do not launch the scan wal operation. - pMeta->scanInfo.scanCounter = 0; - streamMetaWUnLock(pMeta); - return 0; - } - - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, - numOfTasks, alreadyRestored); - - int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + code = doScanWalAsync(pTq, ckPause); streamMetaWUnLock(pMeta); - return code; } @@ -368,11 +328,8 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt return code; } -int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { - *pScanIdle = true; - bool noDataInWal = true; +int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { int32_t vgId = pStreamMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; @@ -410,8 +367,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - *pScanIdle = false; - // seek the stored version and extract data from WAL code = setWalReaderStartOffset(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { @@ -437,7 +392,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { - noDataInWal = false; code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); @@ -449,11 +403,47 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMetaReleaseTask(pStreamMeta, pTask); } - // all wal are checked, and no new data available in wal. - if (noDataInWal) { - *pScanIdle = true; - } - taosArrayDestroy(pTaskList); return TSDB_CODE_SUCCESS; } + +int32_t doScanWalAsync(STQ* pTq, bool ckPause) { + SStreamMeta* pMeta = pTq->pStreamMeta; + bool alreadyRestored = pTq->pVnode->restored; + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + if (pMeta->startInfo.startAllTasks) { + tqTrace("vgId:%d in restart procedure, not scan wal", vgId); + return 0; + } + + pMeta->scanInfo.scanCounter += 1; + if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { + pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; + } + + if (pMeta->scanInfo.scanCounter > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); + return 0; + } + + int32_t numOfPauseTasks = pMeta->numOfPausedTasks; + if (ckPause && numOfTasks == numOfPauseTasks) { + tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); + + // reset the counter value, since we do not launch the scan wal operation. + pMeta->scanInfo.scanCounter = 0; + return 0; + } + + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, + numOfTasks, alreadyRestored); + + return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1ea524dc78..68fb651566 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -718,8 +718,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } } - streamMetaWUnLock(pMeta); - // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); @@ -737,7 +735,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } // commit the update - streamMetaWLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d75f39f376..08fa18ab4b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2950,17 +2950,16 @@ int32_t nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_GEOMETRY: - pVal->pz = taosMemoryMalloc(pVal->nLen + 1); + pVal->pz = taosMemoryCalloc(1, pVal->nLen + 1); if (pVal->pz) { - memcpy(pVal->pz, pNode->datum.p, pVal->nLen); - pVal->pz[pVal->nLen] = 0; + memcpy(pVal->pz, pNode->datum.p, varDataTLen(pNode->datum.p)); } else { code = terrno; } break; case TSDB_DATA_TYPE_JSON: pVal->nLen = getJsonValueLen(pNode->datum.p); - pVal->pz = taosMemoryMalloc(pVal->nLen); + pVal->pz = taosMemoryCalloc(1, pVal->nLen); if (pVal->pz) { memcpy(pVal->pz, pNode->datum.p, pVal->nLen); } else { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 209110b014..b3610d035f 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -1241,7 +1241,6 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) { ctx->code = TSDB_CODE_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - res->node.resType.bytes = varDataTLen(output.columnData->pData); (void)memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { ctx->code = nodesSetValueNodeValue(res, output.columnData->pData); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9be0e3fc40..37249b5418 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -1129,7 +1129,11 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); if (NULL == schMgmt.queryMgmt) { - SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); + void* p = NULL; + SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, &p, NULL)); + if (atomic_val_compare_exchange_ptr(&schMgmt.queryMgmt, NULL, p)) { + qWorkerDestroy(&p); + } } SArray *explainRes = NULL; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 863bc76c79..8f9e4a311c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -37,7 +37,7 @@ extern "C" { #define META_HB_CHECK_INTERVAL 200 #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) -#define STREAM_TASK_QUEUE_CAPACITY 20480 +#define STREAM_TASK_QUEUE_CAPACITY 5120 #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) // clang-format off diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3ca283ce98..641f41daa9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -591,7 +591,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now - streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); @@ -599,7 +598,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pReq->taskId, numOfTasks); } - streamMetaWLock(pMeta); if (pReq->dropRelHTask) { code = streamMetaCommit(pMeta); } @@ -675,8 +673,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV return TSDB_CODE_SUCCESS; } - streamMetaWUnLock(pMeta); - // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); @@ -685,9 +681,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV (int32_t)pReq->hTaskId, numOfTasks); } - streamMetaWLock(pMeta); code = streamMetaCommit(pMeta); - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 318720b5b0..85f287f301 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -522,7 +522,10 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int if (pItem->type == STREAM_INPUT__GET_RES) { const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - + if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + stDebug("s-task:%s set force_window_close as source block, skey:%"PRId64, id, pTrigger->pBlock->info.window.skey); + (*pVer) = pTrigger->pBlock->info.window.skey; + } } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); @@ -671,7 +674,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr); - // update the currentVer if processing the submit blocks. + // update the currentVer if processing the submitted blocks. if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id, pInfo->checkpointVer, pInfo->nextProcessVer, ver); @@ -688,6 +691,34 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock return code; } +// do nothing after sync executor state to storage backend, untill checkpoint is completed. +static int32_t doHandleChkptBlock(SStreamTask* pTask) { + int32_t code = 0; + const char* id = pTask->id.idStr; + + streamMutexLock(&pTask->lock); + SStreamTaskState pState = streamTaskGetStatus(pTask); + if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status + stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); + code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue + } else { // todo refactor + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } else { + code = streamTaskSendCheckpointReadyMsg(pTask); + } + + if (code != TSDB_CODE_SUCCESS) { + // todo: let's retry send rsp to upstream/mnode + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0, + tstrerror(code)); + } + } + + streamMutexUnlock(&pTask->lock); + return code; +} + int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { const char* id = pTask->id.idStr; @@ -832,36 +863,16 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } - if (type != STREAM_INPUT__CHECKPOINT) { + if (type == STREAM_INPUT__CHECKPOINT) { + code = doHandleChkptBlock(pTask); + streamFreeQitem(pInput); + return code; + } else { code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks); streamFreeQitem(pInput); if (code) { return code; } - } else { // todo other thread may change the status - // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state == TASK_STATUS__CK) { - stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); - code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue - } else { // todo refactor - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - code = streamTaskSendCheckpointSourceRsp(pTask); - } else { - code = streamTaskSendCheckpointReadyMsg(pTask); - } - - if (code != TSDB_CODE_SUCCESS) { - // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0, - tstrerror(code)); - } - } - - streamMutexUnlock(&pTask->lock); - streamFreeQitem(pInput); - return code; } } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a6f87711bf..23a98ef3ae 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -501,8 +501,6 @@ _err: void streamMetaInitBackend(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); if (pMeta->streamBackend == NULL) { - streamMetaWUnLock(pMeta); - while (1) { streamMetaWLock(pMeta); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); @@ -908,8 +906,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t int32_t code = 0; STaskId id = {.streamId = streamId, .taskId = taskId}; - streamMetaWLock(pMeta); - code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); if (code == 0) { // desc the paused task counter @@ -958,10 +954,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } streamMetaReleaseTask(pMeta, pTask); - streamMetaWUnLock(pMeta); } else { stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); - streamMetaWUnLock(pMeta); } return 0; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 468c5f2139..1d0c882003 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -192,6 +192,7 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig const char* id = pTask->id.idStr; int8_t precision = pTask->info.interval.precision; SStreamTrigger* pTrigger = NULL; + bool isFull = false; while (1) { code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, @@ -214,7 +215,6 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig // check whether the time window gaps exist or not int64_t now = taosGetTimestamp(precision); - int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; // there are gaps, needs to be filled STimeWindow w = pTrigger->pBlock->info.window; @@ -226,13 +226,18 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig } pTask->status.latestForceWindow = w; - if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) { - int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + isFull = streamQueueIsFull(pTask->inputq.queue); + + if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || isFull) { + int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + if (!isFull) { + *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; + } - *pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now; *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); - stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev, - *pNextTrigger); + pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval; + stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id, + num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer); return code; } else { stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); @@ -289,7 +294,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { - nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec + nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 sec stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL); } else { if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 32b1023ed7..9433a76469 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -233,15 +233,19 @@ int32_t taosBlockSIGPIPE() { } int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { + int32_t code = 0; OS_PARAM_CHECK(fqdn); OS_PARAM_CHECK(ip); + int64_t limitMs = 1000; + int64_t st = taosGetTimestampMs(), cost = 0; #ifdef WINDOWS // Initialize Winsock WSADATA wsaData; int iResult; iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { - return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError()); + code = TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError()); + goto _err; } #endif @@ -260,12 +264,12 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { inRetry = true; continue; } else if (EAI_SYSTEM == ret) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + code = TAOS_SYSTEM_ERROR(errno); + goto _err; } struct sockaddr *sa = result->ai_addr; @@ -275,8 +279,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { *ip = ia.s_addr; freeaddrinfo(result); - - return 0; + goto _err; } #else struct addrinfo hints = {0}; @@ -292,7 +295,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { struct in_addr ia = si->sin_addr; *ip = ia.s_addr; freeaddrinfo(result); - return 0; + goto _err; } else { #ifdef EAI_SYSTEM if (ret == EAI_SYSTEM) { @@ -305,9 +308,16 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { #endif *ip = 0xFFFFFFFF; - return TSDB_CODE_RPC_FQDN_ERROR; + code = TSDB_CODE_RPC_FQDN_ERROR; + goto _err; } #endif +_err: + cost = taosGetTimestampMs() - st; + if (cost >= limitMs) { + uWarn("get ip from fqdn:%s, cost:%" PRId64 "ms", fqdn, cost); + } + return code; } int32_t taosGetFqdn(char *fqdn) { diff --git a/tests/system-test/2-query/union.py b/tests/system-test/2-query/union.py index 5104489592..fc6dd4fb32 100644 --- a/tests/system-test/2-query/union.py +++ b/tests/system-test/2-query/union.py @@ -426,6 +426,15 @@ class TDTestCase: tdLog.printNoPrefix("==========step4:after wal, all check again ") self.all_test() + self.test_TD_33137() + + def test_TD_33137(self): + sql = "select 'asd' union all select 'asdasd'" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(2) + sql = "select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, stable_name `TABLE_NAME`, 'TABLE' `TABLE_TYPE`, table_comment `REMARKS` from information_schema.ins_stables union all select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, table_name `TABLE_NAME`, case when `type`='SYSTEM_TABLE' then 'TABLE' when `type`='NORMAL_TABLE' then 'TABLE' when `type`='CHILD_TABLE' then 'TABLE' else 'UNKNOWN' end `TABLE_TYPE`, table_comment `REMARKS` from information_schema.ins_tables union all select db_name `TABLE_CAT`, '' `TABLE_SCHEM`, view_name `TABLE_NAME`, 'VIEW' `TABLE_TYPE`, NULL `REMARKS` from information_schema.ins_views" + tdSql.query(sql, queryTimes=1) + tdSql.checkRows(47) def stop(self): tdSql.close()