From c0aed5989c9ae99ed58c4f932192ee76515c020c Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 9 Jun 2020 10:14:20 +0000 Subject: [PATCH 1/7] report broken link to server --- src/inc/trpc.h | 1 + src/rpc/src/rpcMain.c | 53 +++++++++++++++++++++---------------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 5c5c77c251..3be304e29b 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -83,6 +83,7 @@ void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcReportProgress(void *pConn, char *pCont, int contLen); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6044061127..b31edd319c 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -482,6 +482,15 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg return; } +// this API is used by server app to keep an APP context in case connection is broken +void rpcReportProgress(void *handle, char *pCont, int contLen) { + SRpcConn *pConn = (SRpcConn *)handle; + + // pReqMsg and reqMsgLen is re-used to store the context from app server + pConn->pReqMsg = pCont; + pConn->reqMsgLen = contLen; +} + static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); @@ -846,6 +855,21 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { return pConn; } +static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { + SRpcInfo *pRpc = pConn->pRpc; + + // if there are pending request, notify the app + tTrace("%s, notify the server app, connection is gone", pConn->info); + + SRpcMsg rpcMsg; + rpcMsg.pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server + rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length + rpcMsg.handle = pConn; + rpcMsg.msgType = pConn->inType; + rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + if (pRpc->cfp) (*(pRpc->cfp))(&rpcMsg, NULL); +} + static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn == NULL) return; SRpcInfo *pRpc = pConn->pRpc; @@ -859,19 +883,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } - if (pConn->inType) { - // if there are pending request, notify the app - tTrace("%s, connection is gone, notify the app", pConn->info); -/* - SRpcMsg rpcMsg; - rpcMsg.pCont = NULL; - rpcMsg.contLen = 0; - rpcMsg.handle = pConn; - rpcMsg.msgType = pConn->inType; - rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - (*(pRpc->cfp))(&rpcMsg); -*/ - } + if (pConn->inType) rpcReportBrokenLinkToServer(pConn); rpcUnlockConn(pConn); rpcCloseConn(pConn); @@ -1210,23 +1222,10 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; - SRpcInfo *pRpc = pConn->pRpc; if (pConn->user[0]) { tTrace("%s, close the connection since no activity", pConn->info); - if (pConn->inType && pRpc->cfp) { - // if there are pending request, notify the app - tTrace("%s, notify the app, connection is gone", pConn->info); -/* - SRpcMsg rpcMsg; - rpcMsg.pCont = NULL; - rpcMsg.contLen = 0; - rpcMsg.handle = pConn; - rpcMsg.msgType = pConn->inType; - rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - (*(pRpc->cfp))(&rpcMsg); -*/ - } + if (pConn->inType) rpcReportBrokenLinkToServer(pConn); rpcCloseConn(pConn); } else { tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); From 2756c730eca7f777786cc583755912760f43f09d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 9 Jun 2020 10:46:58 +0000 Subject: [PATCH 2/7] change the parameters in vread queue --- src/dnode/src/dnodeVRead.c | 9 +-------- src/inc/vnode.h | 9 ++++++++- src/vnode/src/vnodeRead.c | 22 +++++++++++++++------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 2f9e9a0af9..cd18ae6dda 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -26,13 +26,6 @@ #include "dnodeVRead.h" #include "vnode.h" -typedef struct { - SRspRet rspRet; - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; -} SReadMsg; - typedef struct { pthread_t thread; // thread int32_t workerId; // worker ID @@ -218,7 +211,7 @@ static void *dnodeProcessReadQueue(void *param) { } dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); - int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); + int32_t code = vnodeProcessRead(pVnode, pReadMsg); dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 069f99263d..0da1f51e27 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -34,6 +34,13 @@ typedef struct { void *qhandle; //used by query and retrieve msg } SRspRet; +typedef struct { + SRspRet rspRet; + void *pCont; + int32_t contLen; + SRpcMsg rpcMsg; +} SReadMsg; + int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); @@ -52,7 +59,7 @@ void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); void vnodeBuildStatusMsg(void * param); -int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret); +int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f198c2ffe4..974697d057 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -27,17 +27,18 @@ #include "vnodeLog.h" #include "query.h" -static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } -int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { +int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { SVnodeObj *pVnode = (SVnodeObj *)param; + int msgType = pReadMsg->rpcMsg.msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); @@ -55,10 +56,14 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, return TSDB_CODE_RPC_NOT_READY; } - return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { + void * pCont = pReadMsg->pCont; + int32_t contLen = pReadMsg->contLen; + SRspRet *pRet = &pReadMsg->rspRet; + SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; memset(pRet, 0, sizeof(SRspRet)); @@ -91,7 +96,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont return code; } -static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { +static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { + void * pCont = pReadMsg->pCont; + SRspRet *pRet = &pReadMsg->rspRet; + SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet)); From 06d6b18a242c9c77fa421cf05ec4e07c68eb4d66 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 9 Jun 2020 11:48:00 +0000 Subject: [PATCH 3/7] put a check there --- src/dnode/src/dnodePeer.c | 2 ++ src/dnode/src/dnodeShell.c | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index eb017c335e..0adaad7740 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -88,6 +88,8 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { rspMsg.pCont = NULL; rspMsg.contLen = 0; + if (pMsg->pCont == NULL) return; + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { rspMsg.code = TSDB_CODE_RPC_NOT_READY; rpcSendResponse(&rspMsg); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 8eba1f3775..d60bd2b6c6 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -113,6 +113,8 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { rpcMsg.pCont = NULL; rpcMsg.contLen = 0; + if (pMsg->pCont == NULL) return; + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_RPC_NOT_READY; From 8f38c121c9a69dce89b4e5818d72770f98b327d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Jun 2020 11:10:16 +0800 Subject: [PATCH 4/7] [td-225] support query cancel. --- src/query/src/qExecutor.c | 54 +++++++++++++++++++++++++++++++-------- src/vnode/src/vnodeRead.c | 32 +++++++++++++++++++++-- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index cdb05d2288..97d9efa618 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "qfill.h" #include "os.h" +#include "qfill.h" #include "hash.h" #include "hashfunc.h" @@ -5822,20 +5822,39 @@ _over: //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; if (code != TSDB_CODE_SUCCESS) { *pQInfo = NULL; + } else { + SQInfo* pq = (SQInfo*) (*pQInfo); + + T_REF_INC(pq); + T_REF_INC(pq); } // if failed to add ref for all meters in this query, abort current query return code; } -void qDestroyQueryInfo(qinfo_t pQInfo) { +static void doDestoryQueryInfo(SQInfo* pQInfo) { + assert(pQInfo != NULL); qTrace("QInfo:%p query completed", pQInfo); - - // print the query cost summary - queryCostStatis(pQInfo); + queryCostStatis(pQInfo); // print the query cost summary freeQInfo(pQInfo); } +void qDestroyQueryInfo(qinfo_t qHandle) { + SQInfo* pQInfo = (SQInfo*) qHandle; + if (!isValidQInfo(pQInfo)) { + return; + } + + // set the query is cancelled + setQueryKilled(pQInfo); + + int16_t ref = T_REF_DEC(pQInfo); + if (ref == 0) { + doDestoryQueryInfo(pQInfo); + } +} + void qTableQuery(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; @@ -5846,6 +5865,11 @@ void qTableQuery(qinfo_t qinfo) { if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p it is already killed, abort", pQInfo); + + int16_t ref = T_REF_DEC(pQInfo); + if (ref == 0) { + doDestoryQueryInfo(pQInfo); + } return; } @@ -5861,7 +5885,10 @@ void qTableQuery(qinfo_t qinfo) { } sem_post(&pQInfo->dataReady); - // vnodeDecRefCount(pQInfo); + int16_t ref = T_REF_DEC(pQInfo); + if (ref == 0) { + doDestoryQueryInfo(pQInfo); + } } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -5887,20 +5914,27 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; - if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { + if (isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { return false; } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + bool ret = false; if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { - return false; + ret = false; } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - return true; + ret = true; } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - return true; + ret = true; } else { assert(0); } + + if (ret) { + T_REF_INC(pQInfo); + } + + return ret; } int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 974697d057..4c36703e6b 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -59,6 +59,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } +// notify connection(handle) that current qhandle is created, if current connection from +// client is broken, the query needs to be killed immediately. +static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { + SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); + killQueryMsg->qhandle = htobe64((uint64_t) qhandle); + killQueryMsg->free = htons(1); + killQueryMsg->header.vgId = htonl(vgId); + killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + + rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); +} + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void * pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -67,9 +79,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; memset(pRet, 0, sizeof(SRspRet)); - int32_t code = TSDB_CODE_SUCCESS; + // qHandle needs to be freed correctly + if (pReadMsg->rpcMsg.code != TSDB_CODE_SUCCESS) { + assert(pReadMsg->rpcMsg.contLen > 0); + SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; + killQueryMsg->free = htons(killQueryMsg->free); + killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); + + assert(killQueryMsg->free == 1); + qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle); + + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; qinfo_t pQInfo = NULL; + if (contLen != 0) { code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); @@ -79,7 +105,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; - + + vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId); + vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); } else { assert(pCont != NULL); From 2c3f94cdd854f9e8f5c7416fe1d1298115285f2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Jun 2020 15:17:55 +0800 Subject: [PATCH 5/7] [td-225] support kill query --- src/inc/query.h | 6 ++++++ src/query/src/qExecutor.c | 31 ++++++++++++++++++------------- src/vnode/src/vnodeRead.c | 29 ++++++++++++++++++++++------- tests/examples/c/demo.c | 8 ++++---- 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/inc/query.h b/src/inc/query.h index cdadd4759f..10ee0249b6 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -77,6 +77,12 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co */ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); +/** + * kill current ongoing query and free query handle automatically + * @param qinfo + */ +int32_t qKillQuery(qinfo_t qinfo); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 97d9efa618..85d4acdfed 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5846,9 +5846,6 @@ void qDestroyQueryInfo(qinfo_t qHandle) { return; } - // set the query is cancelled - setQueryKilled(pQInfo); - int16_t ref = T_REF_DEC(pQInfo); if (ref == 0) { doDestoryQueryInfo(pQInfo); @@ -5865,11 +5862,7 @@ void qTableQuery(qinfo_t qinfo) { if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p it is already killed, abort", pQInfo); - - int16_t ref = T_REF_DEC(pQInfo); - if (ref == 0) { - doDestoryQueryInfo(pQInfo); - } + qDestroyQueryInfo(pQInfo); return; } @@ -5885,10 +5878,7 @@ void qTableQuery(qinfo_t qinfo) { } sem_post(&pQInfo->dataReady); - int16_t ref = T_REF_DEC(pQInfo); - if (ref == 0) { - doDestoryQueryInfo(pQInfo); - } + qDestroyQueryInfo(pQInfo); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -5914,7 +5904,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; - if (isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { + if (!isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { + qTrace("QInfo:%p invalid qhandle or error occurs, abort query, code:%x", pQInfo, pQInfo->code); return false; } @@ -5932,6 +5923,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { if (ret) { T_REF_INC(pQInfo); + qTrace("QInfo:%p has more results waits for client retrieve", pQInfo); } return ret; @@ -5979,6 +5971,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return code; } +int32_t qKillQuery(qinfo_t qinfo) { + SQInfo *pQInfo = (SQInfo *)qinfo; + + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + setQueryKilled(pQInfo); + qDestroyQueryInfo(pQInfo); + + return TSDB_CODE_SUCCESS; +} + static void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 4c36703e6b..9dce61704f 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -80,14 +80,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - if (pReadMsg->rpcMsg.code != TSDB_CODE_SUCCESS) { - assert(pReadMsg->rpcMsg.contLen > 0); - + if (pReadMsg->rpcMsg.code != TSDB_CODE_RPC_NETWORK_UNAVAIL) { SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - assert(killQueryMsg->free == 1); + assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle); return TSDB_CODE_SUCCESS; @@ -130,10 +128,28 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); + pRetrieve->free = htons(pRetrieve->free); + memset(pRet, 0, sizeof(SRspRet)); + if (pRetrieve->free == 1) { + vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); + int32_t ret = qKillQuery(pQInfo); + + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + pRet->len = sizeof(SRetrieveTableRsp); + + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = pRet->rsp; + pRsp->numOfRows = 0; + pRsp->completed = true; + pRsp->useconds = 0; + + return ret; + } + vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); - + int32_t code = qRetrieveQueryResultInfo(pQInfo); if (code != TSDB_CODE_SUCCESS) { //TODO @@ -146,8 +162,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (qHasMoreResultsToRetrieve(pQInfo)) { pRet->qhandle = pQInfo; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; - } else { - // no further execution invoked, release the ref to vnode + } else { // no further execution invoked, release the ref to vnode qDestroyQueryInfo(pQInfo); vnodeRelease(pVnode); } diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 55a19eb5f9..5b54efe22d 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -115,15 +115,15 @@ int main(int argc, char *argv[]) { printf("success to connect to server\n"); // doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1"); - doQuery(taos, "select * from test.m1"); +// doQuery(taos, "select * from test.m1"); // multiThreadTest(1, taos); // doQuery(taos, "select tbname from test.m1"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)"); -// for(int32_t i = 0; i < 100000; ++i) { -// doQuery(taos, "insert into t1 values(now, 2)"); -// } + for(int32_t i = 0; i < 200; ++i) { + doQuery(taos, "select * from lm2_db0.lm2_stb0"); + } // doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))"); taos_close(taos); From ac5a2827b0be6fdadeb23d2c9ef25c971903f8b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jun 2020 11:55:49 +0800 Subject: [PATCH 6/7] [td-582] fix crash --- src/vnode/src/vnodeRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 9dce61704f..29f8d887d1 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -80,7 +80,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - if (pReadMsg->rpcMsg.code != TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); From 5b1f11f1ff010ecc241cf3bc1f9f4e6f5932477d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Jun 2020 12:06:45 +0800 Subject: [PATCH 7/7] [td-592] --- src/client/src/tscSQLParser.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 372e81d737..9c6ec39d35 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4380,9 +4380,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { return TSDB_CODE_TSC_INVALID_SQL; } - if (index.columnIndex < tscGetNumOfColumns(pTableMeta)) { + int32_t numOfCols = tscGetNumOfColumns(pTableMeta); + if (index.columnIndex < numOfCols) { return invalidSqlErrMsg(pQueryInfo->msg, msg10); - } else if (index.columnIndex == 0) { + } else if (index.columnIndex == numOfCols) { return invalidSqlErrMsg(pQueryInfo->msg, msg11); }