feature/scheduler

This commit is contained in:
dapan1121 2022-03-14 19:42:16 +08:00
parent 8f1cc6ee91
commit bfe1a13640
5 changed files with 18 additions and 9 deletions

View File

@ -190,14 +190,20 @@ typedef struct SEp {
} SEp; } SEp;
typedef struct { typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
} SMsgHead; } SMsgHead;
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
} SRspHead;
// Submit message for one table // Submit message for one table
typedef struct SSubmitBlk { typedef struct SSubmitBlk {
int64_t uid; // table unique id int64_t uid; // table unique id
int32_t tid; // table id int32_t tid; // table id
char tableName[TSDB_TABLE_NAME_LEN];
int32_t padding; // TODO just for padding here int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
@ -2283,4 +2289,3 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
#endif #endif
#endif /*_TD_COMMON_TAOS_MSG_H_*/ #endif /*_TD_COMMON_TAOS_MSG_H_*/

View File

@ -968,7 +968,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
char *p = strchr(usedbReq.db, '.'); char *p = strchr(usedbReq.db, '.');
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) { if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
int32_t vgVersion = taosGetTimestampSec() / 300; static int32_t vgVersion = 1;
if (usedbReq.vgVersion < vgVersion) { if (usedbReq.vgVersion < vgVersion) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo)); usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) { if (usedbRsp.pVgroupInfos == NULL) {
@ -977,12 +977,16 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
} }
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos); mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion; usedbRsp.vgVersion = vgVersion++;
if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
}
} else { } else {
usedbRsp.vgVersion = usedbReq.vgVersion; usedbRsp.vgVersion = usedbReq.vgVersion;
code = 0;
} }
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
code = 0;
} else { } else {
pDb = mndAcquireDb(pMnode, usedbReq.db); pDb = mndAcquireDb(pMnode, usedbReq.db);
if (pDb == NULL) { if (pDb == NULL) {

View File

@ -5052,7 +5052,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows); pExchangeInfo->loadInfo.totalRows);
pDataInfo->status = DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1; completed += 1;
continue; continue;
} }
@ -5206,8 +5206,6 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
@ -5605,7 +5603,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols; pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->nextDataFn = doSysTableScan; pOperator->getNextFn = doSysTableScan;
pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->closeFn = destroySysTableScannerOperatorInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
@ -7349,7 +7347,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->nextDataFn = doProjectOperation; pOperator->getNextFn = doProjectOperation;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroyProjectOperatorInfo; pOperator->closeFn = destroyProjectOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);

View File

@ -840,3 +840,4 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan**
nodesDestroyNode(pLogicPlan); nodesDestroyNode(pLogicPlan);
return code; return code;
} }

View File

@ -1371,6 +1371,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
memcpy(pMsg->msg, pJob->sql, len); memcpy(pMsg->msg, pJob->sql, len);
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
break; break;
} }