other: merge 3.0

This commit is contained in:
Haojun Liao 2022-11-24 17:24:24 +08:00
parent 8dd780b486
commit e5e817ae55
3 changed files with 11 additions and 9 deletions

View File

@ -153,17 +153,17 @@ typedef struct {
SSchemaWrapper* qsw; SSchemaWrapper* qsw;
} SSchemaInfo; } SSchemaInfo;
typedef struct { typedef struct SExchangeOpStopInfo {
int32_t operatorType; int32_t operatorType;
int64_t refId; int64_t refId;
} SExchangeOpStopInfo; } SExchangeOpStopInfo;
typedef struct { typedef struct STaskStopInfo {
SRWLatch lock; SRWLatch lock;
SArray* pStopInfo; SArray* pStopInfo;
} STaskStopInfo; } STaskStopInfo;
typedef struct SExecTaskInfo { struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
@ -260,7 +260,7 @@ typedef struct SExchangeInfo {
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pReadyBlocks; SArray* pResultBlockList;
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default

View File

@ -182,7 +182,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
} }
// we have buffered retrieved datablock, return it directly // we have buffered retrieved datablock, return it directly
SSDataBlock** p = taosArrayPop(pExchangeInfo->pReadyBlocks); SSDataBlock** p = taosArrayPop(pExchangeInfo->pResultBlockList);
if (p != NULL) { if (p != NULL) {
taosArrayPush(pExchangeInfo->pRecycledBlocks, p); taosArrayPush(pExchangeInfo->pRecycledBlocks, p);
return *p; return *p;
@ -193,10 +193,10 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
} }
if (taosArrayGetSize(pExchangeInfo->pReadyBlocks) == 0) { if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
return NULL; return NULL;
} else { } else {
p = taosArrayPop(pExchangeInfo->pReadyBlocks); p = taosArrayPop(pExchangeInfo->pResultBlockList);
taosArrayPush(pExchangeInfo->pRecycledBlocks, p); taosArrayPush(pExchangeInfo->pRecycledBlocks, p);
return *p; return *p;
} }
@ -298,7 +298,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pInfo->pReadyBlocks = taosArrayInit(64, POINTER_BYTES); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
@ -346,7 +346,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
taosArrayDestroyEx(pExInfo->pReadyBlocks, freeBlock); taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock); taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
blockDataDestroy(pExInfo->pDummyBlock); blockDataDestroy(pExInfo->pDummyBlock);

View File

@ -30,6 +30,8 @@ typedef struct SSumRes {
double dsum; double dsum;
}; };
int16_t type; int16_t type;
int64_t prevTs;
bool isPrevTsSet;
} SSumRes; } SSumRes;
typedef struct SMinmaxResInfo { typedef struct SMinmaxResInfo {