diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 9b35686576..6d5fe3aa0d 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -352,7 +352,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { assert(incNumber <= pQueryMsg->numOfSids); pthread_mutex_unlock(&pVnode->vmutex); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pQueryMsg->numOfSids == 0) { // all the meters may have been dropped. goto _query_over; } diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index c28bd31f77..e2055b6709 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -539,10 +539,12 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) { * 3. insert has nothing to do with the query processing. */ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSids, SMeterObj** pMeterObjList, - int32_t* numOfInc) { + int32_t* numOfIncTables) { SVnodeObj* pVnode = &vnodeList[pQueryMsg->vnode]; int32_t num = 0; + int32_t index = 0; + int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { @@ -551,6 +553,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid /* * If table is missing or is in dropping status, config it from management node, and ignore it * during query processing. The error code of TSDB_CODE_NOT_ACTIVE_TABLE will never return to client. + * The missing table needs to be removed from pSids list */ if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DROPPING)) { dWarn("qmsg:%p, vid:%d sid:%d, not there or will be dropped, ignore this table in query", pQueryMsg, @@ -576,9 +579,11 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid * vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can * check if the numOfQueries is 0 or not. */ - pMeterObjList[(*numOfInc)++] = pMeter; + pMeterObjList[(*numOfIncTables)++] = pMeter; atomic_fetch_add_32(&pMeter->numOfQueries, 1); - + + pSids[index++] = pSids[i]; + // output for meter more than one query executed if (pMeter->numOfQueries > 1) { dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid, @@ -587,16 +592,19 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid } } - dTrace("qmsg:%p, query meters: %d, inc query ref %d, numOfQueries on %d meters are 1", pQueryMsg, - pQueryMsg->numOfSids, *numOfInc, (*numOfInc) - num); + dTrace("qmsg:%p, query meters: %d, inc query ref %d, numOfQueries on %d meters are 1, queried meters:%d after " + "filter missing meters", pQueryMsg, pQueryMsg->numOfSids, *numOfIncTables, (*numOfIncTables) - num, index); + assert(pQueryMsg->numOfSids >= (*numOfIncTables) && pQueryMsg->numOfSids >= index); + + pQueryMsg->numOfSids = index; return code; } -void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, int32_t numOfInc) { +void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, int32_t numOfIncTables) { int32_t num = 0; - for (int32_t i = 0; i < numOfInc; ++i) { + for (int32_t i = 0; i < numOfIncTables; ++i) { SMeterObj* pMeter = pMeterObjList[i]; if (pMeter != NULL) { // here, do not need to lock to perform operations @@ -610,7 +618,7 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, } } - dTrace("qmsg:%p, dec query ref for %d meters, numOfQueries on %d meters are 0", pQueryMsg, numOfInc, numOfInc - num); + dTrace("qmsg:%p, dec query ref for %d meters, numOfQueries on %d meters are 0", pQueryMsg, numOfIncTables, numOfIncTables - num); } void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {