From 588f05c7dda0ced63daba9767e22570f3a638b20 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 2 Apr 2024 08:49:48 +0000 Subject: [PATCH 01/14] port main to 30 --- source/libs/scheduler/src/schRemote.c | 2 +- source/libs/scheduler/src/schTask.c | 23 ++- source/libs/transport/inc/transComm.h | 13 +- source/libs/transport/src/transCli.c | 195 +++++++++++++++++--------- source/libs/transport/src/transSvr.c | 7 +- 5 files changed, 155 insertions(+), 85 deletions(-) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index aa33d6f2fc..980b9aa6bf 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1181,7 +1181,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.queryId = pJob->queryId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; - qMsg.execId = pTask->execId; + qMsg.execId = *(int32_t*)param; msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); if (msgSize < 0) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index d96c01fc76..97c3c7d276 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -371,14 +371,13 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, pCtx->roundTotal = pEpSet->numOfEps; } - if (pCtx->roundTimes >= pCtx->roundTotal) { int64_t nowTs = taosGetTimestampMs(); int64_t lastTime = nowTs - pCtx->startTs; if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - pJob->noMoreRetry = true; + pJob->noMoreRetry = true; SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode)); } @@ -418,7 +417,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; - pTask->childReady = 0; + pTask->childReady = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } @@ -505,11 +504,11 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i pLevel->taskExecDoneNum = 0; pLevel->taskLaunchedNum = 0; } - + SCH_RESET_JOB_LEVEL_IDX(pJob); - + code = schDoTaskRedirect(pJob, pTask, pData, rspCode); - + taosMemoryFreeClear(pData->pData); taosMemoryFreeClear(pData->pEpSet); @@ -627,7 +626,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo pTask->maxRetryTimes); return TSDB_CODE_SUCCESS; } - + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; pTask->maxRetryTimes++; @@ -862,7 +861,9 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { while (nodeInfo) { if (nodeInfo->handle) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL); + void *pExecId = taosHashGetKey(nodeInfo, NULL); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); + SCH_TASK_DLOG("start to drop task's %dth execNode", i); } else { SCH_TASK_DLOG("no need to drop task %dth execNode", i); @@ -901,7 +902,6 @@ int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType return TSDB_CODE_SUCCESS; } - int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; @@ -1269,7 +1269,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t int32_t code = TSDB_CODE_SUCCESS; SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type)); - + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; @@ -1277,7 +1277,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_LOCK_TASK(pTask); code = schNotifyTaskOnExecNode(pJob, pTask, type); SCH_UNLOCK_TASK(pTask); - + if (TSDB_CODE_SUCCESS != code) { break; } @@ -1289,7 +1289,6 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_RET(code); } - int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL)); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c010e31320..da6d71e07b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -19,16 +19,12 @@ extern "C" { #endif #include -#include "os.h" -#include "taoserror.h" #include "theap.h" -#include "tmisce.h" #include "tmsg.h" #include "transLog.h" #include "transportInt.h" #include "trpc.h" #include "ttrace.h" -#include "tutil.h" typedef bool (*FilteFunc)(void* arg); @@ -115,9 +111,12 @@ typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { - void* handle; - int64_t refId; - void* pThrd; + void* handle; + int64_t refId; + void* pThrd; + queue q; + int8_t inited; + SRWLatch latch; } SExHandle; typedef struct { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2d8f4ed3c2..062609baac 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -92,6 +92,7 @@ typedef struct SCliMsg { int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) + queue seqq; } SCliMsg; typedef struct SCliThrd { @@ -121,11 +122,7 @@ typedef struct SCliThrd { SHashObj* batchCache; SCliMsg* stopMsg; - - bool quit; - - int newConnCount; - SHashObj* msgCount; + bool quit; } SCliThrd; typedef struct SCliObj { @@ -262,10 +259,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } \ if (i == sz) { \ pMsg = NULL; \ - tDebug("msg not found, %" PRIu64 "", ahandle); \ } else { \ pMsg = transQueueRm(&conn->cliMsgs, i); \ - tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) @@ -343,6 +338,34 @@ bool cliMaySendCachedMsg(SCliConn* conn) { _RETURN: return false; } +bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { + if (refId == 0) return false; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh == NULL) { + tDebug("release conn %p, refId: %" PRId64 "", conn, refId); + return false; + } + taosWLockLatch(&exh->latch); + if (exh->handle == NULL) exh->handle = conn; + exh->inited = 1; + if (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + taosWUnLockLatch(&exh->latch); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + transCtxMerge(&conn->ctx, &t->ctx->appCtx); + transQueuePush(&conn->cliMsgs, t); + tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + cliSend(conn); + return true; + } + taosWUnLockLatch(&exh->latch); + tDebug("empty conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + return false; +} + void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -439,8 +462,14 @@ void cliHandleResp(SCliConn* conn) { return; } } + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + tDebug("conn %p msg refId: %" PRId64 "", conn, refId); destroyCmsg(pMsg); + if (cliConnSendSeqMsg(refId, conn)) { + return; + } + if (cliMaySendCachedMsg(conn) == true) { return; } @@ -451,6 +480,21 @@ void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } +static void cliDestroyMsgInExhandle(int64_t refId) { + if (refId == 0) return; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh) { + taosWLockLatch(&exh->latch); + while (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + destroyCmsg(t); + } + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); + } +} void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (transQueueEmpty(&pConn->cliMsgs)) { @@ -510,6 +554,8 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } if (pMsg == NULL || (pMsg && pMsg->type != Release)) { + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + cliDestroyMsgInExhandle(refId); if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; } @@ -678,7 +724,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } list->numOfConn++; } - tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum); + tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key); return NULL; } @@ -742,13 +788,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; - if (conn->list->size >= 20) { + if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime)); } } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -761,8 +807,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); - conn->refId = exh->refId; + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); + conn->refId = exh->refId; if (conn->refId == -1) { taosMemoryFree(exh); } @@ -779,9 +827,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { if (exh == NULL) { return -1; } + taosWLockLatch(&exh->latch); exh->handle = conn; exh->pThrd = conn->hostThrd; conn->refId = exh->refId; + taosWUnLockLatch(&exh->latch); transReleaseExHandle(transGetRefMgt(), handle); return 0; @@ -882,7 +932,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } conn->list = NULL; - pThrd->newConnCount--; transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -1190,7 +1239,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(pList->port); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1392,7 +1440,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + taosRLockLatch(&exh->latch); SCliConn* conn = exh->handle; + taosRUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); @@ -1425,7 +1476,9 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) *ignore = true; return NULL; } else { + taosRLockLatch(&exh->latch); conn = exh->handle; + taosRUnLockLatch(&exh->latch); if (conn == NULL) { conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); @@ -1439,7 +1492,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { - tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); + tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr); } return conn; } @@ -1598,7 +1651,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(port); tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1858,9 +1910,10 @@ void cliIteraConnMsgs(SCliConn* conn) { bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; - tDebug("ahandle = %" PRIu64 "", ahandle); SCliMsg* pMsg = NULL; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, + conn->refId); transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); @@ -1869,6 +1922,9 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i); if (cliMsg->type == Release) { ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req"); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, + conn->refId); + cliDestroyConn(conn, true); return true; } } @@ -1984,11 +2040,9 @@ static SCliThrd* createThrdObj(void* trans) { taosMemoryFree(pThrd); return NULL; } - if (pTransInst->supportBatch) { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb); - } else { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); - } + int32_t nSync = pTransInst->supportBatch ? 4 : 8; + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); + if (pThrd->asyncPool == NULL) { tError("failed to init async pool"); uv_loop_close(pThrd->loop); @@ -2029,8 +2083,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->quit = false; - pThrd->newConnCount = 0; - pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); return pThrd; } static void destroyThrdObj(SCliThrd* pThrd) { @@ -2076,7 +2128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); - taosHashCleanup(pThrd->msgCount); taosMemoryFree(pThrd); } @@ -2095,14 +2146,7 @@ void cliSendQuit(SCliThrd* thrd) { void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { if (uv_handle_get_type(handle) == UV_TIMER) { - // SCliConn* pConn = handle->data; - // if (pConn != NULL && pConn->timer != NULL) { - // SCliThrd* pThrd = pConn->hostThrd; - // uv_timer_stop((uv_timer_t*)handle); - // handle->data = NULL; - // taosArrayPush(pThrd->timerList, &pConn->timer); - // pConn->timer = NULL; - // } + // do nothing } else { uv_read_stop((uv_stream_t*)handle); } @@ -2137,18 +2181,23 @@ static void doCloseIdleConn(void* param) { cliDestroyConn(conn, true); taosMemoryFree(arg); } +static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { + if (!(rpcDebugFlag & DEBUG_DEBUG)) { + return; + } + STransConnCtx* pCtx = pMsg->ctx; + STraceId* trace = &pMsg->msg.info.traceId; + char tbuf[512] = {0}; + EPSET_TO_STR(&pCtx->epSet, tbuf); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, + pCtx->retryNextInterval); + return; +} static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; - - if (rpcDebugFlag & DEBUG_DEBUG) { - STraceId* trace = &pMsg->msg.info.traceId; - char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); - tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, - pCtx->retryStep, pCtx->retryNextInterval); - } + cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; @@ -2157,12 +2206,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); } -FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { - if (*val != exp) { - *val = newVal; - } -} - FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; @@ -2504,21 +2547,7 @@ int transReleaseCliHandle(void* handle) { } return 0; } - -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { - transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; - } - - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); - if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; - } - +static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); epsetAssign(&pCtx->epSet, pEpSet); @@ -2535,12 +2564,48 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; + QUEUE_INIT(&cliMsg->seqq); + return cliMsg; +} + +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return TSDB_CODE_RPC_BROKEN_LINK; + } + + int64_t handle = (int64_t)pReq->info.handle; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + if (handle != 0) { + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); + if (exh != NULL) { + taosWLockLatch(&exh->latch); + if (exh->handle == NULL && exh->inited != 0) { + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); + taosWUnLockLatch(&exh->latch); + tDebug("msg refId: %" PRId64 "", handle); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; + } + exh->inited = 1; + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), handle); + } + } + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { - destroyCmsg(cliMsg); + EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); + if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } @@ -2726,6 +2791,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); return exh->refId; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f47a688e6f..21ad5be869 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -761,9 +761,12 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; - pConn->status = ConnRelease; transClearBuffer(&pConn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); + if (pConn->status != ConnAcquire) { + return true; + } + pConn->status = ConnRelease; STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); @@ -1090,6 +1093,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; + QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); return pConn; @@ -1121,6 +1125,7 @@ static int reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); transAcquireExHandle(transGetRefMgt(), exh->refId); conn->refId = exh->refId; From c2afd95c4e7dc273ae506eb8fc015c41a277c485 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 3 Apr 2024 10:07:02 +0800 Subject: [PATCH 02/14] fix taosd crash query ins_tags with empty nchar tag --- source/libs/executor/src/sysscanoperator.c | 5 ++++- tests/system-test/0-others/information_schema.py | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 628aacf3c3..903e2f85aa 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -983,7 +983,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, : (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE); tagVarChar = taosMemoryCalloc(1, bufSize + 1); int32_t len = -1; - convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len); + if (tagLen > 0) + convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len); + else + len = 0; varDataSetLen(tagVarChar, len); } } diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 63718417b4..c3d65482fc 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -50,10 +50,11 @@ class TDTestCase: self.tbnum = 20 self.rowNum = 10 self.tag_dict = { - 't0':'int' + 't0':'int', + 't1':f'nchar({self.nchar_length})' } self.tag_values = [ - f'1' + f'1', '""' ] self.binary_str = 'taosdata' self.nchar_str = '涛思数据' @@ -72,7 +73,7 @@ class TDTestCase: tdSql.execute(f'use {self.dbname}') tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) for i in range(self.tbnum): - tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})") + tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]}, {self.tag_values[1]})") self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum) def count_check(self): tdSql.query('select count(*) from information_schema.ins_tables') @@ -313,6 +314,11 @@ class TDTestCase: tdSql.error('alter cluster "activeCode" ""') tdSql.execute('alter cluster "activeCode" "revoked"') + def test_query_ins_tags(self): + sql = f'select tag_name, tag_value from information_schema.ins_tags where table_name = "{self.stbname}_0"' + tdSql.query(sql) + tdSql.checkRows(2) + def run(self): self.prepare_data() self.count_check() @@ -322,6 +328,7 @@ class TDTestCase: self.ins_stable_check2() self.ins_dnodes_check() self.ins_grants_check() + self.test_query_ins_tags() def stop(self): From 94f703889df3c56e89249546319854438c6aa3c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Apr 2024 12:42:53 +0800 Subject: [PATCH 03/14] fix(stream): remove invalid assert. --- source/libs/stream/src/streamStart.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index cb340ade32..0161f382ba 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -119,7 +119,11 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) // add ref for task SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); - ASSERT(p != NULL); + if (p == NULL) { + stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId, + streamTaskGetStatus(pTask)->name); + return TSDB_CODE_SUCCESS; + } pTask->schedHistoryInfo.numOfTicks = numOfTicks; From a0bf82878d8fe2e5720f2cfbbe3e71a6180c58bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 3 Apr 2024 09:11:49 +0000 Subject: [PATCH 04/14] free handle --- source/libs/executor/src/executil.c | 49 +++++++++++++++------------ source/libs/scheduler/src/schRemote.c | 2 ++ 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9a480359ed..add2955a2b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI); static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI); -static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, - STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); +static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, + STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } @@ -642,7 +642,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf info->groupId = calcGroupId(keyBuf, len); if (initRemainGroups) { // groupId ~ table uid - taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid)); + taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), + sizeof(info->uid)); } } @@ -858,7 +859,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S } SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, - SStorageAPI* pStorageAPI) { + SStorageAPI* pStorageAPI) { SSDataBlock* pResBlock = createDataBlock(); if (pResBlock == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -939,11 +940,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S return pResBlock; } -static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) { +static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, + bool* pResultList, bool addUid) { taosArrayClear(pUidList); STableKeyInfo info = {.uid = 0, .groupId = 0}; - int32_t numOfTables = taosArrayGetSize(pUidTagList); + int32_t numOfTables = taosArrayGetSize(pUidTagList); for (int32_t i = 0; i < numOfTables; ++i) { if (pResultList[i]) { uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid; @@ -1143,7 +1145,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); } else { - qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); + qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); } } } @@ -1165,7 +1167,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t)); } - pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1); + pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), + pPayload, size, 1); digest[0] = 1; memcpy(digest + 1, context.digest, tListLen(context.digest)); } @@ -1725,7 +1728,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { return c; } -int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) { +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, + const SReadHandle* readHandle) { pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); @@ -1748,8 +1752,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi // allowed read stt file optimization mode pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) && - (pTableScanNode->scan.node.pConditions == NULL) && - (pTableScanNode->interval == 0); + (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0); int32_t j = 0; for (int32_t i = 0; i < pCond->numOfCols; ++i) { @@ -1891,7 +1894,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision); - int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + int64_t slidingEnd = + taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); } @@ -2136,7 +2140,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* if (groupSort && groupByTbname) { taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); pTableListInfo->numOfOuputGroups = numOfTables; - } else if (groupByTbname && pScanNode->groupOrderScan){ + } else if (groupByTbname && pScanNode->groupOrderScan) { pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { pTableListInfo->numOfOuputGroups = numOfTables; @@ -2147,7 +2151,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* bool initRemainGroups = false; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode; - if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) { + if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && + !(groupSort || pScanNode->groupOrderScan)) { initRemainGroups = true; } } @@ -2271,7 +2276,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr } if (qDebugFlag & DEBUG_DEBUG) { char* pBuf = NULL; - char flagBuf[64]; + char flagBuf[64]; snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); taosMemoryFree(pBuf); @@ -2280,7 +2285,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } -void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { +void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { int64_t* ts = (int64_t*)pColData->pData; int64_t duration = pWin->ekey - pWin->skey + delta; @@ -2289,13 +2294,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t ts[4] = pWin->ekey + delta; // window end key } -int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, + int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; const char* isNull = oldkeyBuf; const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size; for (int32_t i = 0; i < pSortGroupCols->size; ++i) { - const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; @@ -2321,8 +2327,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol return 0; } -int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, - int32_t rowIndex) { +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) { uint32_t colNum = pSortGroupCols->size; SColumnDataAgg* pColAgg = NULL; char* isNull = keyBuf; @@ -2370,7 +2375,7 @@ uint64_t calcGroupId(char* pData, int32_t len) { } SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { - SNode* node; + SNode* node; SNodeList* ret = NULL; FOREACH(node, pSortKeys) { SOrderByExprNode* pSortKey = (SOrderByExprNode*)node; @@ -2386,6 +2391,6 @@ int32_t extractKeysLen(const SArray* keys) { SColumn* pCol = (SColumn*)taosArrayGet(keys, i); len += pCol->bytes; } - len += sizeof(int8_t) * keyNum; //null flag + len += sizeof(int8_t) * keyNum; // null flag return len; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 980b9aa6bf..079fd7d29d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -486,6 +486,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); + rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -526,6 +527,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { if (code) { qError("hb rsp error:%s", tstrerror(code)); + rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); SCH_ERR_JRET(code); } From ae0a95b27ec2dcecef99826283c10412a49fd246 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Sun, 7 Apr 2024 09:49:52 +0800 Subject: [PATCH 05/14] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 2ed3c9afae..125f868758 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -201,9 +201,9 @@ TDengine 对于过期数据提供两种处理方式,由 IGNORE EXPIRED 选项 TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项指定: -1. 检查数据是否被修改,即 IGNORE UPDATE 0:默认配置,如果被修改,则重新计算对应窗口。 +1. 检查数据是否被修改,即 IGNORE UPDATE 0,如果数据被修改,则重新计算对应窗口。 -2. 不检查数据是否被修改,全部按增量数据计算,即 IGNORE UPDATE 1。 +2. 不检查数据是否被修改,全部按增量数据计算,即 IGNORE UPDATE 1,默认配置。 ## 写入已存在的超级表 From f65a7a7afaab839ad8e46e33a1cc49fb1afb21b9 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Sun, 7 Apr 2024 10:01:27 +0800 Subject: [PATCH 06/14] Update 14-stream.md --- docs/en/12-taos-sql/14-stream.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index c41839390f..337660a36c 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -148,7 +148,7 @@ T = latest event time - watermark The window closing time for each batch of data that arrives at the system is updated using the preceding formula, and all windows are closed whose closing time is less than T. If the triggering method is WINDOW_CLOSE or MAX_DELAY, the aggregate result for the window is pushed. -Stream processing strategy for expired data +## Stream processing strategy for expired data The data in expired windows is tagged as expired. TDengine stream processing provides two methods for handling such data: 1. Drop the data. This is the default and often only handling method for most stream processing engines. @@ -157,6 +157,14 @@ The data in expired windows is tagged as expired. TDengine stream processing pro In both of these methods, configuring the watermark is essential for obtaining accurate results (if expired data is dropped) and avoiding repeated triggers that affect system performance (if expired data is recalculated). +## Stream processing strategy for modifying data + +TDengine provides two ways to handle modified data, which are specified by the IGNORE UPDATE option: + +1. Check whether the data has been modified, i.e. IGNORE UPDATE 0, and recalculate the corresponding window if the data has been modified. + +2. Do not check whether the data has been modified, and calculate all the data as incremental data, i.e. IGNORE UPDATE 1, the default configuration. + ## Supported functions All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: From 1b343e662c0ac1e74bd731935b77b67f4be9349e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Apr 2024 10:18:01 +0800 Subject: [PATCH 07/14] fix(stream): retry stop timer for trigger tmr. --- source/libs/stream/src/streamTask.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c7a1a00a46..c610e86322 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -375,17 +375,22 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->schedInfo.pDelayTimer != NULL) { - taosTmrStop(pTask->schedInfo.pDelayTimer); + while(!taosTmrStop(pTask->schedInfo.pDelayTimer)) { + stError("failed to stop the trigger sched timer, wait for 100ms and retry"); + taosMsleep(100); + } pTask->schedInfo.pDelayTimer = NULL; } if (pTask->hTaskInfo.pTimer != NULL) { - taosTmrStop(pTask->hTaskInfo.pTimer); + bool ret = taosTmrStop(pTask->hTaskInfo.pTimer); + ASSERT(ret); pTask->hTaskInfo.pTimer = NULL; } if (pTask->msgInfo.pTimer != NULL) { - taosTmrStop(pTask->msgInfo.pTimer); + bool ret = taosTmrStop(pTask->msgInfo.pTimer); + ASSERT(ret); pTask->msgInfo.pTimer = NULL; } From f82505ba1661d186394ae75163eab2dabf53a3a5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Apr 2024 14:06:25 +0800 Subject: [PATCH 08/14] fix(stream): remove invalid assert --- source/libs/stream/src/streamTask.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c610e86322..113983806b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -383,14 +383,12 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->hTaskInfo.pTimer != NULL) { - bool ret = taosTmrStop(pTask->hTaskInfo.pTimer); - ASSERT(ret); + /*bool ret = */taosTmrStop(pTask->hTaskInfo.pTimer); pTask->hTaskInfo.pTimer = NULL; } if (pTask->msgInfo.pTimer != NULL) { - bool ret = taosTmrStop(pTask->msgInfo.pTimer); - ASSERT(ret); + /*bool ret = */taosTmrStop(pTask->msgInfo.pTimer); pTask->msgInfo.pTimer = NULL; } From ac6846d76c9d78b691ba1c162f72890626a0df27 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sun, 7 Apr 2024 14:41:00 +0800 Subject: [PATCH 09/14] enh: check validity of parallel actions of trans --- source/dnode/mnode/impl/src/mndTrans.c | 41 ++++++++++++++++++++++++++ source/util/src/terror.c | 2 +- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 0e4f4210fb..eb9df06894 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -857,6 +857,43 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) { return 0; } +static bool mndTransActionsOfSameType(SArray *pActions) { + int32_t size = taosArrayGetSize(pActions); + ETrnAct lastActType = TRANS_ACTION_NULL; + bool same = true; + for (int32_t i = 0; i < size; ++i) { + STransAction *pAction = taosArrayGet(pActions, i); + if (i > 0) { + if (lastActType != pAction->actionType) { + same = false; + break; + } + } + lastActType = pAction->actionType; + } + return same; +} + +static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) { + if (pTrans->exec == TRN_EXEC_PARALLEL) { + if (mndTransActionsOfSameType(pTrans->redoActions) == false) { + terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE; + mError("trans:%d, types of parallel redo actions are not the same", pTrans->id); + return -1; + } + + if (pTrans->policy == TRN_POLICY_ROLLBACK) { + if (mndTransActionsOfSameType(pTrans->undoActions) == false) { + terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE; + mError("trans:%d, types of parallel undo actions are not the same", pTrans->id); + return -1; + } + } + } + + return 0; +} + int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { if (pTrans == NULL) return -1; @@ -864,6 +901,10 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } + if (mndTransCheckParallelActions(pMnode, pTrans) != 0) { + return -1; + } + if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) { terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8723f227dd..187588f399 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -285,7 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_IN_DROPPING, "Dnode in dropping sta // mnode-trans TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid transaction stage") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background") From ccc7a0bf08e69dbc9475a018d880a799fb08d862 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sun, 7 Apr 2024 15:00:47 +0800 Subject: [PATCH 10/14] enh: skip trans context switch in error logging --- source/dnode/mnode/impl/src/mndTrans.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index eb9df06894..3b6043479c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1324,24 +1324,25 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf); - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { + mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, topHalf:%d", pTrans->id, terrstr(), terrno, + topHalf); } return code; } static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf); - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to execute undoActions since %s", terrstr()); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { + mError("trans:%d, failed to execute undoActions since %s. topHalf:%d", pTrans->id, terrstr(), topHalf); } return code; } static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf); - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("failed to execute commitActions since %s", terrstr()); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { + mError("trans:%d, failed to execute commitActions since %s. topHalf:%d", pTrans->id, terrstr(), topHalf); } return code; } From 2fa55f04e13995c7c7366b28c8692e33c5057137 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sun, 7 Apr 2024 15:08:04 +0800 Subject: [PATCH 11/14] enh: check validity of commit actions of trans --- source/dnode/mnode/impl/src/mndTrans.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3b6043479c..41ff45038f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -894,6 +894,21 @@ static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) { return 0; } +static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) { + if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) { + terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; + mError("trans:%d, commit actions of non-changeless trans are empty", pTrans->id); + return -1; + } + if (mndTransActionsOfSameType(pTrans->commitActions) == false) { + terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE; + mError("trans:%d, types of commit actions are not the same", pTrans->id); + return -1; + } + + return 0; +} + int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { if (pTrans == NULL) return -1; @@ -905,9 +920,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) { - terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + if (mndTransCheckCommitActions(pMnode, pTrans) != 0) { return -1; } From fb58ca8592724b106aa1d322b769b79227fdd385 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Apr 2024 18:53:25 +0800 Subject: [PATCH 12/14] fix(stream): not wait for the timer stopped. --- source/libs/stream/src/streamTask.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 113983806b..44f70f8b19 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -375,10 +375,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->schedInfo.pDelayTimer != NULL) { - while(!taosTmrStop(pTask->schedInfo.pDelayTimer)) { - stError("failed to stop the trigger sched timer, wait for 100ms and retry"); - taosMsleep(100); - } + taosTmrStop(pTask->schedInfo.pDelayTimer); pTask->schedInfo.pDelayTimer = NULL; } From 2a6c40e8c5ee5dd7c8d3c4ff56586e05c14296a6 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 8 Apr 2024 16:53:13 +0800 Subject: [PATCH 13/14] set winkey for selectivity function --- source/libs/function/src/builtinsimpl.c | 14 +- tests/script/tsim/stream/basic5.sim | 230 ++++++++++++++++++++++++ 2 files changed, 237 insertions(+), 7 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5ab6d5e075..b709abc30b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3201,15 +3201,15 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { prepareBuf(pCtx); - SWinKey key; + SWinKey key = {0}; if (pCtx->saveHandle.pBuf == NULL) { - SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); - if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - int64_t skey = *(int64_t*)colDataGetData(pColInfo, rowIndex); - - key.groupId = pSrcBlock->info.id.groupId; - key.ts = skey; + SColumnInfoData* pColInfo = pCtx->input.pPTS; + if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) { + pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); } + ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); + key.groupId = pSrcBlock->info.id.groupId; + key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);; } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index 583c803e4e..f507ab7d3b 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -15,6 +15,8 @@ sql use test3; sql create table t1(ts timestamp, a int, b int , c int, d double); sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a); +sleep 1000 + sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791213000,2,2,3,1.1); sql insert into t1 values(1648791215000,3,2,3,1.1); @@ -214,4 +216,232 @@ if $data[29][1] != 2 then goto loop11 endi +print step2============= + +sql create database test4 vgroups 4; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams4 trigger at_once ignore expired 0 ignore update 0 into streamt4 as select _wstart, first(a), b, c, ta, tb from st interval(1s); + +sleep 1000 + +sql insert into t1 values(1648791211000,1,2,3,1.0); +sql insert into t1 values(1648791213000,2,3,4,1.1); +sql insert into t2 values(1648791215000,3,4,5,1.1); +sql insert into t2 values(1648791217000,4,5,6,1.1); + +$loop_count = 0 + +loop12: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt4 order by 1; +sql select * from streamt4 order by 1; + +if $rows != 4 then + print ======rows=$rows + goto loop12 +endi + +if $data02 != 2 then + print ======data02=$data02 + goto loop12 +endi + +if $data03 != 3 then + print ======data03=$data03 + goto loop12 +endi + +if $data04 != 1 then + print ======data04=$data04 + goto loop12 +endi + +if $data05 != 1 then + print ======data05=$data05 + goto loop12 +endi + + +if $data22 != 4 then + print ======data22=$data22 + goto loop12 +endi + +if $data23 != 5 then + print ======data23=$data23 + goto loop12 +endi + +if $data24 != 2 then + print ======data24=$data24 + goto loop12 +endi + +if $data25 != 2 then + print ======data25=$data25 + goto loop12 +endi + +print step3============= + +sql create database test5 vgroups 4; +sql use test5; +sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams5 trigger at_once ignore expired 0 ignore update 0 into streamt5 as select _wstart, b, c, ta, tb, max(b) from t1 interval(1s); + +sleep 1000 + +sql insert into t1 values(1648791211000,1,2,3,1.0); +sql insert into t1 values(1648791213000,2,3,4,1.1); +sql insert into t1 values(1648791215000,3,4,5,1.1); +sql insert into t1 values(1648791217000,4,5,6,1.1); + +$loop_count = 0 + +loop13: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt5 order by 1; +sql select * from streamt5 order by 1; + +if $rows != 4 then + print ======rows=$rows + goto loop13 +endi + +if $data01 != 2 then + print ======data02=$data02 + goto loop13 +endi + +if $data02 != 3 then + print ======data03=$data03 + goto loop13 +endi + +if $data03 != 1 then + print ======data04=$data04 + goto loop13 +endi + +if $data04 != 1 then + print ======data05=$data05 + goto loop13 +endi + + +if $data21 != 4 then + print ======data22=$data22 + goto loop13 +endi + +if $data22 != 5 then + print ======data23=$data23 + goto loop13 +endi + +if $data23 != 1 then + print ======data24=$data24 + goto loop13 +endi + +if $data24 != 1 then + print ======data25=$data25 + goto loop13 +endi + +print step4============= + +sql create database test6 vgroups 4; +sql use test6; +sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s); + +sleep 1000 + +sql insert into t1 values(1648791211000,1,2,3,1.0); +sql insert into t1 values(1648791213000,2,3,4,1.1); +sql insert into t2 values(1648791215000,3,4,5,1.1); +sql insert into t2 values(1648791217000,4,5,6,1.1); + +$loop_count = 0 + +loop14: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt6 order by 1; +sql select * from streamt6 order by 1; + +if $rows != 4 then + print ======rows=$rows + goto loop14 +endi + +if $data01 != 2 then + print ======data02=$data02 + goto loop14 +endi + +if $data02 != 3 then + print ======data03=$data03 + goto loop14 +endi + +if $data04 != 1 then + print ======data04=$data04 + goto loop14 +endi + +if $data05 != 1 then + print ======data05=$data05 + goto loop14 +endi + + +if $data21 != 4 then + print ======data22=$data22 + goto loop14 +endi + +if $data22 != 5 then + print ======data23=$data23 + goto loop14 +endi + +if $data24 != 2 then + print ======data24=$data24 + goto loop14 +endi + +if $data25 != 2 then + print ======data25=$data25 + goto loop14 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From ccc3491cdd92d9f775ccc046606c35ca05ef7026 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 Apr 2024 18:39:35 +0800 Subject: [PATCH 14/14] fix:send data to large in tmq if subscribe stable --- source/dnode/vnode/src/tq/tqScan.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 9940164ee2..cc9e8c0136 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); - totalRows += pBlock->info.rows; + *totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); taosArrayPush(pRsp->blockSchema, &pSW);