fix bugs and refactor code
This commit is contained in:
parent
7021d3f640
commit
114dd6ea13
|
@ -390,7 +390,7 @@ static void doQuitSubquery(SSqlObj* pParentSql) {
|
|||
}
|
||||
|
||||
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
|
||||
int32_t numOfTotal = pSupporter->pState->numOfCompleted;
|
||||
int32_t numOfTotal = pSupporter->pState->numOfTotal;
|
||||
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
|
||||
|
||||
if (finished >= numOfTotal) {
|
||||
|
@ -480,7 +480,12 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
}
|
||||
}
|
||||
|
||||
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
|
||||
int32_t numOfTotal = pSupporter->pState->numOfTotal;
|
||||
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
|
||||
|
||||
if (finished >= numOfTotal) {
|
||||
assert(finished == numOfTotal);
|
||||
|
||||
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
|
||||
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres,
|
||||
pSupporter->subqueryIndex);
|
||||
|
@ -539,9 +544,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
}
|
||||
}
|
||||
|
||||
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
|
||||
assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal);
|
||||
int32_t numOfTotal = pSupporter->pState->numOfTotal;
|
||||
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
|
||||
|
||||
if (finished >= numOfTotal) {
|
||||
assert(finished == numOfTotal);
|
||||
tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal,
|
||||
pParentSql->res.code);
|
||||
|
||||
|
@ -756,7 +763,12 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
|
|||
|
||||
quitAllSubquery(pParentSql, pSupporter);
|
||||
} else {
|
||||
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
|
||||
int32_t numOfTotal = pSupporter->pState->numOfTotal;
|
||||
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
|
||||
|
||||
if (finished >= numOfTotal) {
|
||||
assert(finished == numOfTotal);
|
||||
|
||||
tscSetupOutputColumnIndex(pParentSql);
|
||||
|
||||
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
|
||||
|
|
|
@ -2006,9 +2006,9 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
|
|||
// no files left, abort
|
||||
if (fileIndex < 0) {
|
||||
if (step == QUERY_ASC_FORWARD_STEP) {
|
||||
dTrace("QInfo:%p no file to access, try data in cache", GET_QINFO_ADDR(pQuery));
|
||||
dTrace("QInfo:%p no more file to access, try data in cache", GET_QINFO_ADDR(pQuery));
|
||||
} else {
|
||||
dTrace("QInfo:%p no file to access in desc order, query completed", GET_QINFO_ADDR(pQuery));
|
||||
dTrace("QInfo:%p no more file to access in desc order, query completed", GET_QINFO_ADDR(pQuery));
|
||||
}
|
||||
|
||||
vnodeFreeFieldsEx(pRuntimeEnv);
|
||||
|
@ -2596,6 +2596,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo
|
|||
// cache block has been flushed to disk, no required data block in cache.
|
||||
SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
|
||||
if (pBlock == NULL) {
|
||||
pQuery->skey = rawskey; // restore the skey
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -2868,8 +2869,8 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn
|
|||
}
|
||||
}
|
||||
|
||||
static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter,
|
||||
SPointInterpoSupporter *pPointInterpSupporter, SMeterObj *pMeterObj,TSKEY nextKey) {
|
||||
static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter,
|
||||
SMeterObj *pMeterObj,TSKEY nextKey) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -5442,6 +5443,9 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
|
|||
pStatus->overStatus = pQuery->over;
|
||||
pStatus->lastKey = pQuery->lastKey;
|
||||
|
||||
pStatus->skey = pQuery->skey;
|
||||
pStatus->ekey = pQuery->ekey;
|
||||
|
||||
pStatus->start = pRuntimeEnv->startPos;
|
||||
pStatus->next = pRuntimeEnv->nextPos;
|
||||
pStatus->end = pRuntimeEnv->endPos;
|
||||
|
@ -5465,6 +5469,8 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta
|
|||
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
|
||||
|
||||
pQuery->lastKey = pStatus->lastKey;
|
||||
pQuery->skey = pStatus->skey;
|
||||
pQuery->ekey = pStatus->ekey;
|
||||
|
||||
pQuery->over = pStatus->overStatus;
|
||||
|
||||
|
|
|
@ -339,13 +339,11 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) {
|
|||
pNode->prev1 = pEntry;
|
||||
|
||||
pEntry->num++;
|
||||
|
||||
pObj->size++;
|
||||
|
||||
char key[512] = {0};
|
||||
memcpy(key, pNode->key, MIN(512, pNode->keyLen));
|
||||
|
||||
pTrace("key:%s %p add to hash table", key, pNode);
|
||||
// char key[512] = {0};
|
||||
// memcpy(key, pNode->key, MIN(512, pNode->keyLen));
|
||||
// pTrace("key:%s %p add to hash table", key, pNode);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue