diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 61868588da..412fa388b6 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -120,7 +120,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont; - dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, + dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; @@ -142,8 +142,8 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { pMsg->pCont = pCont; - dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), - pMsg->handle, pMsg->ahandle); + dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle, + pMsg->ahandle); switch (ftype) { case PROC_REGISTER: diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3314211437..d59ed3e605 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -54,7 +54,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); -static void mndTransSendRpcRsp(STrans *pTrans); +static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransReq(SNodeMsg *pReq); static int32_t mndProcessKillTransReq(SNodeMsg *pReq); @@ -737,7 +737,7 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { return 0; } -static void mndTransSendRpcRsp(STrans *pTrans) { +static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { bool sendRsp = false; if (pTrans->stage == TRN_STAGE_FINISHED) { @@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(STrans *pTrans) { .ahandle = pTrans->rpcAHandle, .pCont = rpcCont, .contLen = pTrans->rpcRspLen}; - rpcSendResponse(&rspMsg); + tmsgSendRsp(&pMnode->msgCb, &rspMsg); pTrans->rpcHandle = NULL; pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; @@ -1158,7 +1158,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { } } - mndTransSendRpcRsp(pTrans); + mndTransSendRpcRsp(pMnode, pTrans); } static int32_t mndProcessTransReq(SNodeMsg *pReq) { diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index be1d47a189..2ccd255909 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -30,21 +30,20 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); -int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); +int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, + int32_t code); +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); -int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); -void qwFreeFetchRsp(void *msg); +int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); +void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); -int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); +int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); - - #ifdef __cplusplus } #endif diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 71e7415ea5..133a80b154 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -763,8 +763,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu dropConnection = &ctx->connInfo; QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - - qwBuildAndSendDropRsp(&ctx->connInfo, code); + + qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); @@ -802,9 +802,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - qwBuildAndSendDropRsp(&ctx->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); - + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -830,12 +830,12 @@ _return: } if (dropConnection) { - qwBuildAndSendDropRsp(dropConnection, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, dropConnection, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code)); } if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, code); + qwBuildAndSendCancelRsp(&mgmt->msgCb, cancelConnection, code); QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); } @@ -886,9 +886,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - qwBuildAndSendDropRsp(&ctx->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); - + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -918,7 +918,7 @@ _return: } if (readyConnection) { - qwBuildAndSendReadyRsp(readyConnection, code); + qwBuildAndSendReadyRsp(&mgmt->msgCb, readyConnection, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); } @@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); queryRsped = true; @@ -986,9 +986,9 @@ _return: input.code = code; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - + if (!queryRsped) { - qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); + qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1051,7 +1051,7 @@ _return: } if (needRsp) { - qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); + qwBuildAndSendReadyRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwMsg->connInfo = ctx->connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } else { atomic_store_8((int8_t*)&ctx->queryContinue, 1); @@ -1114,7 +1114,7 @@ _return: rsp = NULL; qwMsg->connInfo = ctx->connInfo; - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); + qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -1195,7 +1195,7 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); + qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } @@ -1226,9 +1226,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); } else if (ctx->phase > 0) { - qwBuildAndSendDropRsp(&qwMsg->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - + QW_ERR_JRET(qwDropTask(QW_FPARAMS())); rsped = true; } else { @@ -1241,7 +1241,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } - + _return: if (code) { @@ -1261,7 +1261,7 @@ _return: } if (TSDB_CODE_SUCCESS != code) { - qwBuildAndSendDropRsp(&qwMsg->connInfo, code); + qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } @@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: - qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + qwBuildAndSendHbRsp(&mgmt->msgCb, &qwMsg->connInfo, &rsp, code); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_RET(TSDB_CODE_SUCCESS); @@ -1351,8 +1351,9 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); - QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); + qwBuildAndSendHbRsp(&mgmt->msgCb, &rspList[j].connInfo, &rspList[j].rsp, code); + QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), + (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); tFreeSSchedulerHbRsp(&rspList[j].rsp); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 76c42581f1..78553462b8 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { SQueryTableRsp rsp = {.code = code}; int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); @@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); pRsp->code = code; @@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { +int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); void *pRsp = rpcMallocCont(contLen); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); @@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_ .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); pRsp->code = code; @@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { +int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); pRsp->code = code; @@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { .code = code, }; - rpcSendResponse(&rpcRsp); + tmsgSendRsp(pMsgCb, &rpcRsp); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { +int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) { int32_t numOfCols = 6; SVShowTablesRsp showRsp = {0}; @@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { .code = code, }; - rpcSendResponse(&rpcMsg); + tmsgSendRsp(pMsgCb, &rpcMsg); return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { +int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); int32_t handle = htonl(pFetchReq->id); @@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe .code = 0, }; - rpcSendResponse(&rpcMsg); + tmsgSendRsp(pMsgCb, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { _return: - QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code)); + QW_ERR_RET(qwBuildAndSendCancelRsp(&mgmt->msgCb, &qwMsg.connInfo, code)); QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code)); return TSDB_CODE_SUCCESS; @@ -579,15 +579,16 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } - int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + SQWorkerMgmt *pMgmt = qWorkerMgmt; + int32_t code = 0; SVShowTablesReq *pReq = pMsg->pCont; - QW_RET(qwBuildAndSendShowRsp(pMsg, code)); + QW_RET(qwBuildAndSendShowRsp(&pMgmt->msgCb, pMsg, code)); } int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { @@ -595,8 +596,8 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) return TSDB_CODE_QRY_INVALID_INPUT; } + SQWorkerMgmt *pMgmt = qWorkerMgmt; + SVShowTablesFetchReq *pFetchReq = pMsg->pCont; - QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); + QW_RET(qwBuildAndSendShowFetchRsp(&pMgmt->msgCb, pMsg, pFetchReq)); } - - diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index fcc5a0bcb8..4dce5f560f 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -297,7 +297,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); } else { headLen = *(int16_t *)(pQueue->pBuffer); - headLen = *(int8_t *)(pQueue->pBuffer + 2); + ftype = *(int8_t *)(pQueue->pBuffer + 2); bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } diff --git a/tests/script/tsim/insert/basic1.sim b/tests/script/tsim/insert/basic1.sim index 653a44a18a..3a3f8d000e 100644 --- a/tests/script/tsim/insert/basic1.sim +++ b/tests/script/tsim/insert/basic1.sim @@ -41,7 +41,7 @@ print =============== insert data, mode1: one row one table in sql print =============== insert data, mode1: mulit rows one table in sql print =============== insert data, mode1: one rows mulit table in sql print =============== insert data, mode1: mulit rows mulit table in sql -sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) +sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) print =============== query data