Merge pull request #9644 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
84a1e6e73c
|
@ -311,6 +311,14 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
|
|||
|
||||
if (NULL == vgInfo) {
|
||||
ctgError("no hash range found for hashvalue[%u]", hashValue);
|
||||
|
||||
void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL);
|
||||
while (pIter1) {
|
||||
vgInfo = pIter1;
|
||||
ctgError("valid range:[%d, %d], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId);
|
||||
pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1);
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
|
@ -773,7 +781,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
|
|||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup));
|
||||
|
||||
_return:
|
||||
|
||||
if (dbInfo) {
|
||||
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
|
||||
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
|
||||
|
|
|
@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
|
|||
code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tdDestroyKVRowBuilder(pKvRowBuilder);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
}
|
||||
|
@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
const char* msg3 = "tag value too long";
|
||||
const char* msg4 = "illegal value or data overflow";
|
||||
|
||||
int32_t code = 0;
|
||||
STableMeta* pSuperTableMeta = NULL;
|
||||
|
||||
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
||||
// super table name, create table by using dst
|
||||
|
@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j);
|
||||
|
||||
SToken* pSTableNameToken = &pCreateTableInfo->stbName;
|
||||
int32_t code = parserValidateNameToken(pSTableNameToken);
|
||||
code = parserValidateNameToken(pSTableNameToken);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SName name = {0};
|
||||
code = createSName(&name, pSTableNameToken, pCtx, pMsgBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SArray* pValList = pCreateTableInfo->pTagVals;
|
||||
size_t numOfInputTag = taosArrayGetSize(pValList);
|
||||
|
||||
STableMeta* pSuperTableMeta = NULL;
|
||||
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
assert(pSuperTableMeta != NULL);
|
||||
|
@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
|
||||
if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
tfree(pSuperTableMeta);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
bool findColumnIndex = false;
|
||||
|
@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (pItem->pVar.nLen > pSchema->bytes) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
tfree(pSuperTableMeta);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
goto _error;
|
||||
}
|
||||
} else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
|
||||
|
@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
|
||||
|
||||
// check again after the convert since it may be converted from binary to nchar.
|
||||
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(pSchema->type)) {
|
||||
int16_t len = varDataTLen(tagVal);
|
||||
if (len > pSchema->bytes) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
tfree(pSuperTableMeta);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
tfree(pSuperTableMeta);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
|
||||
|
@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
} else {
|
||||
if (schemaSize != numOfInputTag) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
tfree(pSuperTableMeta);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
tfree(pSuperTableMeta);
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
if (row == NULL) {
|
||||
tfree(pSuperTableMeta);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
tdSortKVRowByColIdx(row);
|
||||
|
@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
|||
SName tableName = {0};
|
||||
code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tfree(pSuperTableMeta);
|
||||
return code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
// Find a appropriate vgroup to accommodate this table , according to the table name
|
||||
SVgroupInfo info = {0};
|
||||
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
|
||||
code = catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info);
|
||||
tfree(pSuperTableMeta);
|
||||
}
|
||||
|
||||
*pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap);
|
||||
if (*pBufArray == NULL) {
|
||||
code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
tfree(pSuperTableMeta);
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
|
||||
|
|
|
@ -67,8 +67,8 @@ typedef struct SSchTask {
|
|||
int32_t msgLen; // msg length
|
||||
int8_t status; // task status
|
||||
SQueryNodeAddr execAddr; // task actual executed node address
|
||||
int8_t condidateIdx; // current try condidation index
|
||||
SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr
|
||||
int8_t candidateIdx; // current try condidation index
|
||||
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
|
||||
SQueryProfileSummary summary; // task execution summary
|
||||
int32_t childReady; // child task ready number
|
||||
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
|
||||
|
|
|
@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
|
|||
}
|
||||
|
||||
static void cleanupTask(SSchTask* pTask) {
|
||||
taosArrayDestroy(pTask->condidateAddrs);
|
||||
taosArrayDestroy(pTask->candidateAddrs);
|
||||
}
|
||||
|
||||
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) {
|
||||
|
@ -226,20 +226,20 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
|
||||
if (task->condidateAddrs) {
|
||||
int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
|
||||
if (task->candidateAddrs) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
task->condidateIdx = 0;
|
||||
task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
if (NULL == task->condidateAddrs) {
|
||||
task->candidateIdx = 0;
|
||||
task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
if (NULL == task->candidateAddrs) {
|
||||
qError("taosArrayInit failed");
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (task->plan->execNode.numOfEps > 0) {
|
||||
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) {
|
||||
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
|
||||
qError("taosArrayPush failed");
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
|
|||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
|
||||
|
||||
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) {
|
||||
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
|
||||
qError("taosArrayPush failed");
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -798,7 +798,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
|||
}
|
||||
|
||||
SEpSet epSet;
|
||||
SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx);
|
||||
SQueryNodeAddr *addr = taosArrayGet(task->candidateAddrs, task->candidateIdx);
|
||||
|
||||
schConvertAddrToEpSet(addr, &epSet);
|
||||
|
||||
|
@ -816,9 +816,9 @@ _return:
|
|||
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
||||
SSubplan *plan = task->plan;
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
|
||||
SCH_ERR_RET(schSetTaskCondidateAddrs(job, task));
|
||||
SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
|
||||
|
||||
if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) {
|
||||
if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) {
|
||||
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue