[tbase-875]
This commit is contained in:
parent
46ecd3b2b8
commit
0bef668cfb
|
@ -352,7 +352,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
|
|||
assert(incNumber <= pQueryMsg->numOfSids);
|
||||
pthread_mutex_unlock(&pVnode->vmutex);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || pQueryMsg->numOfSids == 0) { // all the meters may have been dropped.
|
||||
goto _query_over;
|
||||
}
|
||||
|
||||
|
|
|
@ -539,10 +539,12 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) {
|
|||
* 3. insert has nothing to do with the query processing.
|
||||
*/
|
||||
int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSids, SMeterObj** pMeterObjList,
|
||||
int32_t* numOfInc) {
|
||||
int32_t* numOfIncTables) {
|
||||
SVnodeObj* pVnode = &vnodeList[pQueryMsg->vnode];
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t index = 0;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
|
||||
|
@ -551,6 +553,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
|
|||
/*
|
||||
* If table is missing or is in dropping status, config it from management node, and ignore it
|
||||
* during query processing. The error code of TSDB_CODE_NOT_ACTIVE_TABLE will never return to client.
|
||||
* The missing table needs to be removed from pSids list
|
||||
*/
|
||||
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DROPPING)) {
|
||||
dWarn("qmsg:%p, vid:%d sid:%d, not there or will be dropped, ignore this table in query", pQueryMsg,
|
||||
|
@ -576,9 +579,11 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
|
|||
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
|
||||
* check if the numOfQueries is 0 or not.
|
||||
*/
|
||||
pMeterObjList[(*numOfInc)++] = pMeter;
|
||||
pMeterObjList[(*numOfIncTables)++] = pMeter;
|
||||
atomic_fetch_add_32(&pMeter->numOfQueries, 1);
|
||||
|
||||
|
||||
pSids[index++] = pSids[i];
|
||||
|
||||
// output for meter more than one query executed
|
||||
if (pMeter->numOfQueries > 1) {
|
||||
dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid,
|
||||
|
@ -587,16 +592,19 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
|
|||
}
|
||||
}
|
||||
|
||||
dTrace("qmsg:%p, query meters: %d, inc query ref %d, numOfQueries on %d meters are 1", pQueryMsg,
|
||||
pQueryMsg->numOfSids, *numOfInc, (*numOfInc) - num);
|
||||
dTrace("qmsg:%p, query meters: %d, inc query ref %d, numOfQueries on %d meters are 1, queried meters:%d after "
|
||||
"filter missing meters", pQueryMsg, pQueryMsg->numOfSids, *numOfIncTables, (*numOfIncTables) - num, index);
|
||||
|
||||
assert(pQueryMsg->numOfSids >= (*numOfIncTables) && pQueryMsg->numOfSids >= index);
|
||||
|
||||
pQueryMsg->numOfSids = index;
|
||||
return code;
|
||||
}
|
||||
|
||||
void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, int32_t numOfInc) {
|
||||
void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList, int32_t numOfIncTables) {
|
||||
int32_t num = 0;
|
||||
|
||||
for (int32_t i = 0; i < numOfInc; ++i) {
|
||||
for (int32_t i = 0; i < numOfIncTables; ++i) {
|
||||
SMeterObj* pMeter = pMeterObjList[i];
|
||||
|
||||
if (pMeter != NULL) { // here, do not need to lock to perform operations
|
||||
|
@ -610,7 +618,7 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
|
|||
}
|
||||
}
|
||||
|
||||
dTrace("qmsg:%p, dec query ref for %d meters, numOfQueries on %d meters are 0", pQueryMsg, numOfInc, numOfInc - num);
|
||||
dTrace("qmsg:%p, dec query ref for %d meters, numOfQueries on %d meters are 0", pQueryMsg, numOfIncTables, numOfIncTables - num);
|
||||
}
|
||||
|
||||
void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {
|
||||
|
|
Loading…
Reference in New Issue