refresh meta
This commit is contained in:
parent
40a3f02463
commit
a353b1a1fb
|
@ -299,6 +299,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
}
|
||||
|
||||
*pRes = res.res;
|
||||
|
||||
pRequest->code = code;
|
||||
terrno = code;
|
||||
return pRequest->code;
|
||||
|
|
|
@ -2885,7 +2885,7 @@ _return:
|
|||
|
||||
|
||||
|
||||
int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver) {
|
||||
int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t* sver, int32_t *tbType, uint64_t *suid, char* stbName) {
|
||||
*sver = -1;
|
||||
|
||||
if (NULL == pCtg->dbCache) {
|
||||
|
@ -2903,14 +2903,12 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tbType = 0;
|
||||
uint64_t suid = 0;
|
||||
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
STableMeta* tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
|
||||
if (tbMeta) {
|
||||
tbType = tbMeta->tableType;
|
||||
suid = tbMeta->suid;
|
||||
if (tbType != TSDB_CHILD_TABLE) {
|
||||
*tbType = tbMeta->tableType;
|
||||
*suid = tbMeta->suid;
|
||||
if (*tbType != TSDB_CHILD_TABLE) {
|
||||
*sver = tbMeta->sversion;
|
||||
}
|
||||
}
|
||||
|
@ -2921,44 +2919,49 @@ int32_t ctgGetTbSverFromCache(SCatalog* pCtg, const SName* pTableName, int32_t*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (tbType != TSDB_CHILD_TABLE) {
|
||||
if (*tbType != TSDB_CHILD_TABLE) {
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname);
|
||||
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, suid);
|
||||
ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, *suid);
|
||||
|
||||
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||
|
||||
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &suid, sizeof(suid));
|
||||
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, suid, sizeof(*suid));
|
||||
if (NULL == stbMeta || NULL == *stbMeta) {
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgDebug("stb not in stbCache, suid:%"PRIx64, suid);
|
||||
ctgDebug("stb not in stbCache, suid:%"PRIx64, *suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if ((*stbMeta)->suid != suid) {
|
||||
if ((*stbMeta)->suid != *suid) {
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, suid, (*stbMeta)->suid);
|
||||
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, *suid, (*stbMeta)->suid);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
size_t nameLen = 0;
|
||||
char* name = taosHashGetKey(*stbMeta, &nameLen);
|
||||
|
||||
strncpy(stbName, name, nameLen);
|
||||
stbName[nameLen] = 0;
|
||||
|
||||
*sver = (*stbMeta)->sversion;
|
||||
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
|
||||
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, tbType, dbFName, pTableName->tname);
|
||||
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
|
@ -2977,9 +2980,26 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
|
|||
continue;
|
||||
}
|
||||
|
||||
ctgGetTbSverFromCache(pCtg, &name, &sver);
|
||||
int32_t tbType = 0;
|
||||
uint64_t suid = 0;
|
||||
char stbName[TSDB_TABLE_FNAME_LEN];
|
||||
ctgGetTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName);
|
||||
if (sver >= 0 && sver < pTb->sver) {
|
||||
catalogRemoveTableMeta(pCtg, &name); //TODO REMOVE STB FROM CACHE
|
||||
switch (tbType) {
|
||||
case TSDB_CHILD_TABLE: {
|
||||
SName stb = name;
|
||||
strcpy(stb.tname, stbName);
|
||||
catalogRemoveTableMeta(pCtg, &stb);
|
||||
break;
|
||||
}
|
||||
case TSDB_SUPER_TABLE:
|
||||
case TSDB_NORMAL_TABLE:
|
||||
catalogRemoveTableMeta(pCtg, &name);
|
||||
break;
|
||||
default:
|
||||
ctgError("ignore table type %d", tbType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4576,10 +4576,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
return pExprs;
|
||||
}
|
||||
|
||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model) {
|
||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||
|
||||
pTaskInfo->schemaVer.dbname = strdup(dbFName);
|
||||
pTaskInfo->cost.created = taosGetTimestampMs();
|
||||
pTaskInfo->id.queryId = queryId;
|
||||
pTaskInfo->execModel = model;
|
||||
|
@ -4994,16 +4995,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
return NULL;
|
||||
}
|
||||
|
||||
const char* tname = pTaskInfo->schemaVer.tablename;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||
|
||||
if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) &&
|
||||
strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) {
|
||||
pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName);
|
||||
}
|
||||
|
||||
SColMatchInfo c = {0};
|
||||
c.output = true;
|
||||
c.colId = pColNode->colId;
|
||||
|
@ -5099,7 +5094,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
uint64_t queryId = pPlan->id.queryId;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pTaskInfo = createExecTaskInfo(queryId, taskId, model);
|
||||
*pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
|
||||
if (*pTaskInfo == NULL) {
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _complete;
|
||||
|
|
|
@ -58,7 +58,7 @@ static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen
|
|||
|
||||
// 3. valid column names
|
||||
for (int32_t j = i + 1; j < numOfCols; ++j) {
|
||||
if (strncasecmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
|
||||
if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1161,8 +1161,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||
|
||||
if (NULL == rsp) {
|
||||
atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
|
||||
atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
|
||||
ctx->dataConnInfo = qwMsg->connInfo;
|
||||
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
|
||||
} else {
|
||||
|
|
|
@ -2565,24 +2565,32 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
*pJob = 0;
|
||||
|
||||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
||||
} else {
|
||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
|
||||
SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
|
||||
}
|
||||
|
||||
SSchJob *job = schAcquireJob(*pJob);
|
||||
_return:
|
||||
|
||||
pRes->code = atomic_load_32(&job->errCode);
|
||||
pRes->numOfRows = job->resNumOfRows;
|
||||
if (SCH_RES_TYPE_QUERY == job->resType) {
|
||||
pRes->res = job->resData;
|
||||
job->resData = NULL;
|
||||
if (*pJob) {
|
||||
SSchJob *job = schAcquireJob(*pJob);
|
||||
|
||||
pRes->code = atomic_load_32(&job->errCode);
|
||||
pRes->numOfRows = job->resNumOfRows;
|
||||
if (SCH_RES_TYPE_QUERY == job->resType) {
|
||||
pRes->res = job->resData;
|
||||
job->resData = NULL;
|
||||
}
|
||||
|
||||
schReleaseJob(*pJob);
|
||||
}
|
||||
|
||||
schReleaseJob(*pJob);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
|
||||
|
|
Loading…
Reference in New Issue