feature/scheduler

This commit is contained in:
dapan1121 2022-03-10 19:36:30 +08:00
parent 9f140ceca4
commit 9359d738eb
2 changed files with 54 additions and 28 deletions

View File

@ -2623,6 +2623,40 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskStatus); }
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->tableName.type) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tableName.acctId) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tableName.dbname) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tableName.tname) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->tableName.type) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tableName.acctId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tableName.dbname) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tableName.tname) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) {
int32_t tlen = 0;
@ -2655,4 +2689,3 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
return buf;
}

View File

@ -8098,8 +8098,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) {
int32_t code = 0;
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) {
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node
pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0);
}
@ -8111,18 +8110,18 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
char tableFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pScanPhyNode->tableName, tableFName);
code = vnodeValidateTableHash(pHandle->config, tableFName);
int32_t code = vnodeValidateTableHash(pHandle->config, tableFName);
if (code) {
errInfo->code = code;
errInfo->tableName = pScanPhyNode->tableName;
return code;
return NULL;
}
size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols);
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId);
code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
pTaskInfo->pRoot = createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
@ -8132,11 +8131,7 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0};
code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
if (code) {
return code;
}
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
SArray* idList = NULL;
if (groupInfo.numOfTables > 0) {
@ -8155,11 +8150,9 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
idList = taosArrayInit(4, sizeof(uint64_t));
}
// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo);
// taosArrayDestroy(idList);
// //TODO destroy groupInfo
// return pOperator;
// SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pScanPhyNode->pScanCols, idList, pTaskInfo);
taosArrayDestroy(idList);
// return pOperator;
}
}
@ -8169,14 +8162,14 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
code = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo);
if (code) {
return code;
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo);
if (errInfo->code) {
return NULL;
}
SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
pTaskInfo->pRoot = createAggregateOperatorInfo(pTaskInfo->pRoot, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo);
return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo);
}
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
@ -8188,12 +8181,6 @@ int32_t doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
if (pTaskInfo->pRoot == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
}
return code;
}
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readHandle, uint64_t queryId, uint64_t taskId) {
@ -8267,8 +8254,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
STableGroupInfo group = {0};
code = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo);
if (code) {
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo);
if (errInfo->code) {
code = errInfo->code;
goto _complete;
}
if ((*pTaskInfo)->pRoot == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
}