enh: optimize oprator param usage

This commit is contained in:
dapan1121 2023-07-07 16:49:35 +08:00
parent 7d8ec8a45e
commit c19c59d5e6
12 changed files with 201 additions and 173 deletions

View File

@ -2052,25 +2052,16 @@ typedef struct {
typedef struct SOperatorParam {
SArray* pOpParams; //SArray<SOperatorSpecParam>
int32_t opType;
int32_t downstreamIdx;
void* value;
SArray* pChildren; //SArray<SOperatorParam*>
} SOperatorParam;
typedef struct SOperatorSpecParam {
int32_t opType;
void* value;
} SOperatorSpecParam;
typedef struct SOperatorBaseParam {
SOperatorParam* pChild;
} SOperatorBaseParam;
typedef struct STableScanOperatorParam {
SOperatorParam* pChild;
SArray* pUidList;
} STableScanOperatorParam;
typedef struct {
SMsgHead header;
uint64_t sId;

View File

@ -5500,71 +5500,70 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
}
int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) {
int32_t n = taosArrayGetSize(pOpParam->pOpParams);
if (tEncodeI32(pEncoder, pOpParam->opType) < 0) return -1;
switch (pOpParam->opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value;
int32_t uidNum = taosArrayGetSize(pScan->pUidList);
if (tEncodeI32(pEncoder, uidNum) < 0) return -1;
for (int32_t m = 0; m < uidNum; ++m) {
int64_t* pUid = taosArrayGet(pScan->pUidList, m);
if (tEncodeI64(pEncoder, *pUid) < 0) return -1;
}
break;
}
default:
return TSDB_CODE_INVALID_PARA;
}
int32_t n = taosArrayGetSize(pOpParam->pChildren);
if (tEncodeI32(pEncoder, n) < 0) return -1;
for (int32_t i = 0; i < n; ++i) {
SOperatorSpecParam* pSpec = (SOperatorSpecParam*)taosArrayGet(pOpParam->pOpParams, i);
if (tEncodeI32(pEncoder, pSpec->opType) < 0) return -1;
switch (pSpec->opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = (STableScanOperatorParam*)pSpec->value;
if (pScan->pChild) {
if (tSerializeSOperatorParam(pEncoder, pScan->pChild) < 0) return -1;
} else {
if (tEncodeI32(pEncoder, 0) < 0) return -1;
}
int32_t uidNum = taosArrayGetSize(pScan->pUidList);
if (tEncodeI32(pEncoder, uidNum) < 0) return -1;
for (int32_t m = 0; m < uidNum; ++m) {
int64_t* pUid = taosArrayGet(pScan->pUidList, m);
if (tEncodeI64(pEncoder, *pUid) < 0) return -1;
}
break;
}
default:
return TSDB_CODE_INVALID_PARA;
}
SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOpParam->pChildren, i);
if (tSerializeSOperatorParam(pEncoder, pChild) < 0) return -1;
}
return 0;
}
int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam, int32_t specNum) {
pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam));
if (NULL == pOpParam->pOpParams) return -1;
SOperatorSpecParam specParam;
for (int32_t i = 0; i < specNum; ++i) {
if (tDecodeI32(pDecoder, &specParam.opType) < 0) return -1;
switch (specParam.opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
if (NULL == pScan) return -1;
int32_t childSpecNum = 0;
if (tDecodeI32(pDecoder, &childSpecNum) < 0) return -1;
if (childSpecNum > 0) {
pScan->pChild = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == pScan->pChild) return -1;
if (tDeserializeSOperatorParam(pDecoder, pScan->pChild, childSpecNum) < 0) return -1;
int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) {
if (tDecodeI32(pDecoder, &pOpParam->opType) < 0) return -1;
switch (pOpParam->opType) {
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
if (NULL == pScan) return -1;
int32_t uidNum = 0;
int64_t uid = 0;
if (tDecodeI32(pDecoder, &uidNum) < 0) return -1;
if (uidNum > 0) {
pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t));
if (NULL == pScan->pUidList) return -1;
for (int32_t m = 0; m < uidNum; ++m) {
if (tDecodeI64(pDecoder, &uid) < 0) return -1;
taosArrayPush(pScan->pUidList, &uid);
}
int32_t uidNum = 0;
int64_t uid = 0;
if (tDecodeI32(pDecoder, &uidNum) < 0) return -1;
if (uidNum > 0) {
pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t));
if (NULL == pScan->pUidList) return -1;
for (int32_t m = 0; m < uidNum; ++m) {
if (tDecodeI64(pDecoder, &uid) < 0) return -1;
taosArrayPush(pScan->pUidList, &uid);
}
}
specParam.value = pScan;
break;
} else {
pScan->pUidList = NULL;
}
default:
return TSDB_CODE_INVALID_PARA;
break;
}
taosArrayPush(pOpParam->pOpParams, &specParam);
default:
return TSDB_CODE_INVALID_PARA;
}
int32_t childrenNum = 0;
if (tDecodeI32(pDecoder, &childrenNum) < 0) return -1;
if (childrenNum > 0) {
pOpParam->pChildren = taosArrayInit(childrenNum, POINTER_BYTES);
if (NULL == pOpParam->pChildren) return -1;
for (int32_t i = 0; i < childrenNum; ++i) {
SOperatorParam* pChild = taosMemoryCalloc(1, sizeof(SOperatorParam));
if (NULL == pChild) return -1;
if (tDeserializeSOperatorParam(pDecoder, pChild) < 0) return -1;
taosArrayPush(pOpParam->pChildren, &pChild);
}
} else {
pOpParam->pChildren = NULL;
}
return 0;
@ -5587,6 +5586,7 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->execId) < 0) return -1;
if (pReq->pOpParam) {
if (tEncodeI32(&encoder, 1) < 0) return -1;
if (tSerializeSOperatorParam(&encoder, pReq->pOpParam) < 0) return -1;
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
@ -5623,12 +5623,12 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
int32_t specNum = 0;
if (tDecodeI32(&decoder, &specNum) < 0) return -1;
if (specNum > 0) {
int32_t paramNum = 0;
if (tDecodeI32(&decoder, &paramNum) < 0) return -1;
if (paramNum > 0) {
pReq->pOpParam = taosMemoryMalloc(sizeof(*pReq->pOpParam));
if (NULL == pReq->pOpParam) return -1;
if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam, specNum) < 0) return -1;
if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam) < 0) return -1;
}
tEndDecode(&decoder);

View File

@ -149,11 +149,9 @@ typedef struct SLimitInfo {
} SLimitInfo;
typedef struct SSortMergeJoinOperatorParam {
SOperatorParam* pChild;
} SSortMergeJoinOperatorParam;
typedef struct SExchangeOperatorParam {
SOperatorParam* pChild;
int32_t vgId;
int32_t srcOpType;
SArray* uidList;

View File

@ -62,6 +62,11 @@ typedef struct SGcSessionCtx {
SGcBlkBufInfo* pLastBlk;
} SGcSessionCtx;
typedef struct SGcExecInfo {
int32_t downstreamNum;
int64_t* pDownstreamBlkNum;
} SGcExecInfo;
typedef struct SGroupCacheOperatorInfo {
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
@ -69,6 +74,7 @@ typedef struct SGroupCacheOperatorInfo {
SSHashObj* pBlkHash;
int64_t pCurrentId;
SGcSessionCtx* pCurrent;
SGcExecInfo execInfo;
} SGroupCacheOperatorInfo;
#ifdef __cplusplus

View File

@ -73,7 +73,7 @@ typedef struct SOperatorInfo {
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
SResultInfo resultInfo;
SOperatorBaseParam* pOperatorParam;
SOperatorParam* pOperatorParam;
SOperatorParam** pDownstreamParams;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator

View File

@ -41,48 +41,43 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes,
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam));
(*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
if (NULL == pGc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pGc->pChild = pChild;
pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
pGc->downstreamIdx = downstreamIdx;
pGc->needCache = needCache;
pGc->pGroupValue = pGrpValue;
pGc->groupValueSize = grpValSize;
SOperatorSpecParam specParam;
specParam.opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
specParam.value = pGc;
taosArrayPush((*ppRes)->pOpParams, &specParam);
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
(*ppRes)->value = pGc;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) {
static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) {
*ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pChildren = NULL;
SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
if (NULL == pExc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExc->pChild = pChild;
pExc->vgId = *pVgId;
pExc->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pExc->uidList = taosArrayInit(1, sizeof(int64_t));
@ -92,12 +87,10 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i
}
taosArrayPush(pExc->uidList, pUid);
SOperatorSpecParam specParam;
specParam.opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
specParam.value = pExc;
taosArrayPush((*ppRes)->pOpParams, &specParam);
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
(*ppRes)->downstreamIdx = downstreamIdx;
(*ppRes)->value = pExc;
return TSDB_CODE_SUCCESS;
}
@ -106,28 +99,24 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes,
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(2, sizeof(SOperatorSpecParam));
(*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSortMergeJoinOperatorParam* pJoin0 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
SSortMergeJoinOperatorParam* pJoin1 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
if (NULL == pJoin0 || NULL == pJoin1) {
taosMemoryFree(pJoin0);
taosMemoryFree(pJoin1);
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pJoin0->pChild = pChild0;
pJoin1->pChild = pChild1;
SOperatorSpecParam specParam;
specParam.opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
specParam.value = pJoin0;
taosArrayPush((*ppRes)->pOpParams, &specParam);
specParam.value = pJoin1;
taosArrayPush((*ppRes)->pOpParams, &specParam);
SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
(*ppRes)->value = pJoin;
return TSDB_CODE_SUCCESS;
}
@ -142,11 +131,11 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ
SOperatorParam* pExcParam0 = NULL;
SOperatorParam* pExcParam1 = NULL;
SOperatorParam* pGcParam0 = NULL;
SOperatorParam* pGcParam1 = NULL;
SOperatorParam* pGcParam1 = NULL;
int32_t code = buildExchangeOperatorParam(&pExcParam0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL);
int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL);
if (TSDB_CODE_SUCCESS == code) {
code = buildExchangeOperatorParam(&pExcParam1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL);
code = buildExchangeOperatorParam(&pExcParam1, 1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0);

View File

@ -93,7 +93,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
// todo
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
@ -422,27 +422,20 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam));
if (NULL == *ppRes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->pChild = NULL;
pScan->pUidList = taosArrayDup(pUidList, NULL);
if (NULL == pScan->pUidList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SOperatorSpecParam specParam;
specParam.opType = srcOpType;
specParam.value = pScan;
taosArrayPush((*ppRes)->pOpParams, &specParam);
(*ppRes)->opType = srcOpType;
(*ppRes)->value = pScan;
(*ppRes)->pChildren = NULL;
return TSDB_CODE_SUCCESS;
}
@ -753,7 +746,7 @@ _error:
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value;
int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pParam->vgId, sizeof(pParam->vgId));
if (NULL == pIdx) {
qError("No exchange source for vgId: %d", pParam->vgId);

View File

@ -66,9 +66,23 @@ static void freeGroupCacheBufPage(void* param) {
taosMemoryFree(pInfo->data);
}
static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
char* buf = taosMemoryMalloc(pGrpCacheOperator->execInfo.downstreamNum * 32 + 100);
if (NULL == buf) {
return;
}
int32_t offset = sprintf(buf, "groupCache exec info, downstreamBlkNum:");
for (int32_t i = 0; i < pGrpCacheOperator->execInfo.downstreamNum; ++i) {
offset += sprintf(buf + offset, " %" PRId64 , pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]);
}
qDebug("%s", buf);
}
static void destroyGroupCacheOperator(void* param) {
SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param;
logGroupCacheExecInfo(pGrpCacheOperator);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage);
@ -191,7 +205,7 @@ static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBl
SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam;
SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value;
SGcSessionCtx* pSession = NULL;
SSDataBlock* pRes = NULL;
int32_t code = TSDB_CODE_SUCCESS;
@ -211,8 +225,12 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
break;
}
pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++;
if (pGCache->pCurrent->needCache) {
addBlkToGroupCache(pOperator, pBlock, &pRes);
} else {
pRes = pBlock;
}
break;
}
@ -220,6 +238,15 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) {
return pRes;
}
static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info;
pInfo->execInfo.downstreamNum = pOperator->numOfDownstream;
pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t));
if (NULL == pInfo->execInfo.pDownstreamBlkNum) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
@ -263,6 +290,11 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
code = initGroupCacheExecInfo(pOperator);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator;

View File

@ -605,26 +605,14 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return TSDB_CODE_SUCCESS;
}
void *getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx) {
if (NULL == param) {
return NULL;
}
int32_t opNum = taosArrayGetSize(param->pOpParams);
if (idx >= opNum) {
return NULL;
}
SOperatorSpecParam *pSpecParam = taosArrayGet(param->pOpParams, idx);
if (opType == pSpecParam->opType) {
return pSpecParam->value;
}
return NULL;
}
int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
if (NULL == pParam) {
pOperator->pOperatorParam = NULL;
taosMemoryFreeClear(pOperator->pDownstreamParams);
return TSDB_CODE_SUCCESS;
}
pOperator->pOperatorParam = (pParam->opType == pOperator->operatorType) ? pParam : NULL;
if (NULL == pOperator->pDownstreamParams) {
pOperator->pDownstreamParams = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
@ -632,16 +620,24 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SOperatorBaseParam* pBaseParam = NULL;
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pBaseParam = getOperatorParam(pOperator->operatorType, pParam, i);
if (pBaseParam) {
pOperator->pDownstreamParams[i] = pBaseParam->pChild;
} else {
if (NULL == pOperator->pOperatorParam) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pOperator->pDownstreamParams[i] = pParam;
}
return TSDB_CODE_SUCCESS;
}
memset(pOperator->pDownstreamParams, 0, pOperator->numOfDownstream * POINTER_BYTES);
int32_t childrenNum = taosArrayGetSize(pOperator->pOperatorParam->pChildren);
if (childrenNum <= 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < childrenNum; ++i) {
SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i);
pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild;
}
return TSDB_CODE_SUCCESS;
@ -671,8 +667,7 @@ SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int
SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0);
int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam);
int32_t code = setOperatorParams(pOperator, pParam);
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);

View File

@ -789,7 +789,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = 0;
STableListInfo* pListInfo = pInfo->base.pTableListInfo;
STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam;
STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam->value;
int32_t num = taosArrayGetSize(pParam->pUidList);
if (num <= 0) {
qError("empty table scan uid list");
@ -797,6 +797,9 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
}
qDebug("add total %d dynamic tables to scan", num);
taosArrayClear(pListInfo->pTableList);
taosHashClear(pListInfo->map);
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
@ -816,6 +819,32 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
return code;
}
static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
// reset value for the next group data output
pOperator->status = OP_OPENED;
resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) {
return result;
}
setOperatorCompleted(pOperator);
return NULL;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -827,6 +856,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pInfo->currentGroupId != -1) {
return startNextGroupScan(pOperator);
}
}
// scan table one by one sequentially
@ -895,25 +928,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
// reset value for the next group data output
pOperator->status = OP_OPENED;
resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
result = doGroupedTableScan(pOperator);
if (result != NULL) {
return result;
}
setOperatorCompleted(pOperator);
return NULL;
return startNextGroupScan(pOperator);
}
}

View File

@ -124,6 +124,7 @@ typedef struct SQWTaskCtx {
int32_t queryMsgType;
int32_t fetchMsgType;
int32_t level;
int32_t dynExecId;
uint64_t sId;
bool queryGotData;

View File

@ -488,14 +488,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
}
int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) {
if (!ctx->queryExecDone || !ctx->queryEnd) {
QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d, queryEnd:%d", ctx->queryExecDone, ctx->queryEnd);
if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryExecDone, true, false)) {
QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d", ctx->queryExecDone);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryEnd, true, false)) {
QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_TASK_DLOG("the %dth dynamic task exec started", ctx->dynExecId++);
QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
@ -515,7 +523,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_SET_PHASE(ctx, phase);
if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) {
QW_TASK_ELOG_E("query already end");
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
}