|
|
|
@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -67,7 +67,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -90,7 +90,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -113,7 +113,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
|
|
|
|
|
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -158,7 +158,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type);
|
|
|
|
|
qDebug("QID:%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -181,7 +181,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name);
|
|
|
|
|
qDebug("QID:%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -204,7 +204,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name);
|
|
|
|
|
qDebug("QID:%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -227,11 +227,96 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
|
|
|
|
|
|
|
|
|
|
taosArrayPush(pJob->pTasks, &task);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user);
|
|
|
|
|
qDebug("QID:%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
|
|
|
|
|
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
|
|
|
|
|
if (dbNum > 0) {
|
|
|
|
|
if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) {
|
|
|
|
|
SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
|
|
|
if (NULL == pDb) {
|
|
|
|
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char* dbFName = taosHashIterate(pDb, NULL);
|
|
|
|
|
while (dbFName) {
|
|
|
|
|
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
|
|
|
|
dbFName = taosHashIterate(pDb, dbFName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosHashCleanup(pDb);
|
|
|
|
|
} else {
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
|
|
|
|
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
|
|
|
|
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
|
|
|
|
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum;
|
|
|
|
|
if (tbNum > 0) {
|
|
|
|
|
if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) {
|
|
|
|
|
SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
|
|
|
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
|
|
|
|
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableHash, i);
|
|
|
|
|
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SName* name = taosHashIterate(pTb, NULL);
|
|
|
|
|
while (name) {
|
|
|
|
|
catalogRemoveTableMeta(pCtg, name);
|
|
|
|
|
name = taosHashIterate(pTb, name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosHashCleanup(pTb);
|
|
|
|
|
} else {
|
|
|
|
|
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
|
|
|
|
catalogRemoveTableMeta(pCtg, name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableHash, i);
|
|
|
|
|
catalogRemoveTableMeta(pCtg, name);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
@ -283,12 +368,13 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|
|
|
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pReq->forceUpdate) {
|
|
|
|
|
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t taskIdx = 0;
|
|
|
|
|
for (int32_t i = 0; i < dbVgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
|
|
|
|
if (pReq->forceUpdate) {
|
|
|
|
|
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
|
|
|
|
}
|
|
|
|
|
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -304,9 +390,6 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < tbMetaNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
|
|
|
|
if (pReq->forceUpdate) {
|
|
|
|
|
catalogRemoveTableMeta(pCtg, name);
|
|
|
|
|
}
|
|
|
|
|
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -342,7 +425,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|
|
|
|
|
|
|
|
|
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
|
|
|
|
|
|
|
|
|
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum);
|
|
|
|
|
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|