Merge pull request #13534 from taosdata/feature/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
0d634d5242
|
@ -71,16 +71,21 @@ typedef struct SCatalogReq {
|
|||
bool forceUpdate;
|
||||
} SCatalogReq;
|
||||
|
||||
typedef struct SMetaRes {
|
||||
int32_t code;
|
||||
void* pRes;
|
||||
} SMetaRes;
|
||||
|
||||
typedef struct SMetaData {
|
||||
SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*>
|
||||
SArray *pDbCfg; // SArray<SDbCfgInfo>
|
||||
SArray *pDbInfo; // SArray<SDbInfo>
|
||||
SArray *pTableMeta; // SArray<STableMeta*>
|
||||
SArray *pTableHash; // SArray<SVgroupInfo>
|
||||
SArray *pUdfList; // SArray<SFuncInfo>
|
||||
SArray *pIndex; // SArray<SIndexInfo>
|
||||
SArray *pUser; // SArray<bool>
|
||||
SArray *pQnodeList; // SArray<SQueryNodeAddr>
|
||||
SArray *pDbVgroup; // pRes = SArray<SVgroupInfo>*
|
||||
SArray *pDbCfg; // pRes = SDbCfgInfo*
|
||||
SArray *pDbInfo; // pRes = SDbInfo*
|
||||
SArray *pTableMeta; // pRes = STableMeta*
|
||||
SArray *pTableHash; // pRes = SVgroupInfo*
|
||||
SArray *pUdfList; // pRes = SFuncInfo*
|
||||
SArray *pIndex; // pRes = SIndexInfo*
|
||||
SArray *pUser; // pRes = bool*
|
||||
SArray *pQnodeList; // pRes = SQueryNodeAddr*
|
||||
} SMetaData;
|
||||
|
||||
typedef struct SCatalogCfg {
|
||||
|
|
|
@ -210,8 +210,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
|
||||
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \
|
||||
(_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \
|
||||
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code == TSDB_CODE_PAR_VALUE_TOO_LONG) || \
|
||||
(_code == TSDB_CODE_PAR_INVALID_DROP_COL))
|
||||
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED || (_code) == TSDB_CODE_PAR_VALUE_TOO_LONG || \
|
||||
(_code) == TSDB_CODE_PAR_INVALID_DROP_COL || ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID))
|
||||
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
|
||||
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
|
||||
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
|
||||
|
|
|
@ -339,9 +339,9 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060D)
|
||||
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060E)
|
||||
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060F)
|
||||
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0600)
|
||||
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0601)
|
||||
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0602)
|
||||
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0610)
|
||||
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0611)
|
||||
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0612)
|
||||
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0613)
|
||||
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0614)
|
||||
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0615)
|
||||
|
|
|
@ -45,7 +45,7 @@ extern "C" {
|
|||
|
||||
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
|
||||
#define HEARTBEAT_INTERVAL 1500 // ms
|
||||
#define SYNC_ON_TOP_OF_ASYNC 0
|
||||
#define SYNC_ON_TOP_OF_ASYNC 1
|
||||
|
||||
enum {
|
||||
RES_TYPE__QUERY = 1,
|
||||
|
@ -144,7 +144,7 @@ typedef struct STscObj {
|
|||
int32_t numOfReqs; // number of sqlObj bound to this connection
|
||||
SAppInstInfo* pAppInfo;
|
||||
SHashObj* pRequests;
|
||||
int8_t schemalessType;
|
||||
int8_t schemalessType; // todo remove it, this attribute should be move to request
|
||||
} STscObj;
|
||||
|
||||
typedef struct SResultColumn {
|
||||
|
|
|
@ -580,7 +580,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
|||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
|
||||
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code),
|
||||
pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
doAsyncQuery(pRequest, true);
|
||||
return;
|
||||
|
@ -592,6 +593,7 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
|||
pRequest->code = code;
|
||||
}
|
||||
|
||||
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
|
||||
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
|
||||
removeMeta(pTscObj, pRequest->tableList);
|
||||
}
|
||||
|
@ -695,6 +697,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
|
||||
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
|
||||
} else {
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
}
|
||||
|
||||
//todo not to be released here
|
||||
|
@ -702,7 +706,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
break;
|
||||
}
|
||||
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
||||
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -500,7 +500,11 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
doFetchRows(pRequest, false, true);
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
doAsyncFetchRow(pRequest, false, true);
|
||||
#else
|
||||
doFetchRows(pRequest, true, true);
|
||||
#endif
|
||||
|
||||
// TODO refactor
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
@ -625,8 +629,10 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
|
|||
taosMemoryFree(pWrapper);
|
||||
launchAsyncQuery(pRequest, pQuery);
|
||||
} else {
|
||||
tscDebug("error happens, code:%d", code);
|
||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
|
||||
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code),
|
||||
pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
doAsyncQuery(pRequest, true);
|
||||
return;
|
||||
|
@ -691,6 +697,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
|
|||
.pTransporter = pTscObj->pAppInfo->pTransporter,
|
||||
.pStmtCb = NULL,
|
||||
.pUser = pTscObj->user,
|
||||
.schemalessType = pTscObj->schemalessType,
|
||||
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
|
||||
.async = true,};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -699,13 +706,14 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
|
|||
void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
|
||||
SParseContext* pCxt = NULL;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
|
||||
pRequest->code = pRequest->prevCode;
|
||||
code = pRequest->prevCode;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t code = createParseContext(pRequest, &pCxt);
|
||||
code = createParseContext(pRequest, &pCxt);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -742,7 +750,7 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
|
|||
}
|
||||
|
||||
_error:
|
||||
tscError("0x%"PRIx64" error happens, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
|
||||
tscError("0x%"PRIx64" error happens, code:%d - %s, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId);
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
|
|
|
@ -92,6 +92,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
|||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
|
||||
ASSERT(pConsumer);
|
||||
|
||||
|
||||
mInfo("receive consumer lost msg, consumer id %ld, status %s", pLostMsg->consumerId,
|
||||
mndConsumerStatusName(pConsumer->status));
|
||||
|
||||
|
|
|
@ -173,7 +173,6 @@ typedef struct SCtgJob {
|
|||
SArray* pTasks;
|
||||
int32_t taskDone;
|
||||
SMetaData jobRes;
|
||||
int32_t rspCode;
|
||||
|
||||
uint64_t queryId;
|
||||
SCatalog* pCtg;
|
||||
|
@ -201,11 +200,12 @@ typedef struct SCtgMsgCtx {
|
|||
|
||||
typedef struct SCtgTask {
|
||||
CTG_TASK_TYPE type;
|
||||
int32_t taskId;
|
||||
SCtgJob *pJob;
|
||||
void* taskCtx;
|
||||
SCtgMsgCtx msgCtx;
|
||||
void* res;
|
||||
int32_t taskId;
|
||||
SCtgJob* pJob;
|
||||
void* taskCtx;
|
||||
SCtgMsgCtx msgCtx;
|
||||
int32_t code;
|
||||
void* res;
|
||||
} SCtgTask;
|
||||
|
||||
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
|
||||
|
|
|
@ -437,13 +437,14 @@ _return:
|
|||
int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pTableMeta) {
|
||||
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, POINTER_BYTES);
|
||||
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pTableMeta) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pTableMeta, &pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pTableMeta, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -451,14 +452,14 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pDbVgroup) {
|
||||
pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, POINTER_BYTES);
|
||||
pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pDbVgroup) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pDbVgroup, &pTask->res);
|
||||
pTask->res = NULL;
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pDbVgroup, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -466,13 +467,14 @@ int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pTableHash) {
|
||||
pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SVgroupInfo));
|
||||
pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pTableHash) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pTableHash, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pTableHash, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -480,21 +482,29 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pIndex) {
|
||||
pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SIndexInfo));
|
||||
pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pIndex) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pIndex, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pIndex, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pQnodeList) {
|
||||
pJob->jobRes.pQnodeList = taosArrayInit(1, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pQnodeList) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
TSWAP(pJob->jobRes.pQnodeList, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pQnodeList, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -502,13 +512,14 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pDbCfg) {
|
||||
pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SDbCfgInfo));
|
||||
pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pDbCfg) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pDbCfg, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pDbCfg, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -516,13 +527,14 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pDbInfo) {
|
||||
pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SDbInfo));
|
||||
pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pDbInfo) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pDbInfo, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pDbInfo, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -530,13 +542,14 @@ int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpUdfRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pUdfList) {
|
||||
pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SFuncInfo));
|
||||
pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pUdfList) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pUdfList, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pUdfList, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -544,13 +557,14 @@ int32_t ctgDumpUdfRes(SCtgTask* pTask) {
|
|||
int32_t ctgDumpUserRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pUser) {
|
||||
pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(bool));
|
||||
pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pUser) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pJob->jobRes.pUser, pTask->res);
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pUser, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -559,20 +573,13 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
|
|||
SCtgJob* pJob = pTask->pJob;
|
||||
int32_t code = 0;
|
||||
|
||||
qDebug("QID:%" PRIx64 " task %d end with rsp %s", pJob->queryId, pTask->taskId, tstrerror(rspCode));
|
||||
qDebug("QID:0x%" PRIx64 " task %d end with rsp %s", pJob->queryId, pTask->taskId, tstrerror(rspCode));
|
||||
|
||||
if (rspCode) {
|
||||
int32_t lastCode = atomic_val_compare_exchange_32(&pJob->rspCode, 0, rspCode);
|
||||
if (0 == lastCode) {
|
||||
CTG_ERR_JRET(rspCode);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pTask->code = rspCode;
|
||||
|
||||
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
|
||||
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
|
||||
qDebug("task done: %d, total: %d", taskDone, (int32_t)taosArrayGetSize(pJob->pTasks));
|
||||
qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, (int32_t)taosArrayGetSize(pJob->pTasks));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -47,11 +47,13 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
|
|||
void ctgFreeSMetaData(SMetaData* pData) {
|
||||
taosArrayDestroy(pData->pTableMeta);
|
||||
pData->pTableMeta = NULL;
|
||||
|
||||
|
||||
/*
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pData->pDbVgroup); ++i) {
|
||||
SArray** pArray = taosArrayGet(pData->pDbVgroup, i);
|
||||
taosArrayDestroy(*pArray);
|
||||
}
|
||||
*/
|
||||
taosArrayDestroy(pData->pDbVgroup);
|
||||
pData->pDbVgroup = NULL;
|
||||
|
||||
|
@ -61,10 +63,12 @@ void ctgFreeSMetaData(SMetaData* pData) {
|
|||
taosArrayDestroy(pData->pUdfList);
|
||||
pData->pUdfList = NULL;
|
||||
|
||||
/*
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) {
|
||||
SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i);
|
||||
taosArrayDestroy(pInfo->pRetensions);
|
||||
}
|
||||
*/
|
||||
taosArrayDestroy(pData->pDbCfg);
|
||||
pData->pDbCfg = NULL;
|
||||
|
||||
|
@ -320,8 +324,12 @@ void ctgFreeTask(SCtgTask* pTask) {
|
|||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_CFG: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
if (pTask->res) {
|
||||
SDbCfgInfo* pInfo = (SDbCfgInfo*)pTask->res;
|
||||
taosArrayDestroy(pInfo->pRetensions);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_DB_INFO: {
|
||||
|
|
|
@ -108,9 +108,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt);
|
|||
|
||||
static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFunctionNode* pFunc) {
|
||||
if (fmIsBuiltinFunc(pFunc->functionName)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
return reserveUdfInCache(pFunc->functionName, pCxt->pComCxt->pMetaCache);
|
||||
pCxt->errCode = reserveUdfInCache(pFunc->functionName, pCxt->pComCxt->pMetaCache);
|
||||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTableNode* pRealTable,
|
||||
|
@ -179,6 +180,10 @@ static int32_t collectMetaKeyFromSelect(SCollectMetaKeyCxt* pCxt, SSelectStmt* p
|
|||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromAlterDatabase(SCollectMetaKeyCxt* pCxt, SAlterDatabaseStmt* pStmt) {
|
||||
return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTableStmt* pStmt) {
|
||||
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTags) {
|
||||
|
@ -376,6 +381,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
|||
return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt);
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt);
|
||||
case QUERY_NODE_ALTER_DATABASE_STMT:
|
||||
return collectMetaKeyFromAlterDatabase(pCxt, (SAlterDatabaseStmt*)pStmt);
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
|
||||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
|
|
|
@ -1006,7 +1006,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
|||
}
|
||||
|
||||
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
|
||||
char* tmpTokenBuf = taosMemoryCalloc(1, sToken.n); //todo this can be optimize with parse column
|
||||
char* tmpTokenBuf = taosMemoryCalloc(1, sToken.n); // todo this can be optimize with parse column
|
||||
code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(tmpTokenBuf);
|
||||
|
@ -1018,7 +1018,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
|||
taosMemoryFree(tmpTokenBuf);
|
||||
goto end;
|
||||
}
|
||||
if(isNullStr(&sToken)) {
|
||||
if (isNullStr(&sToken)) {
|
||||
code = tTagNew(pTagVals, 1, true, &pTag);
|
||||
} else {
|
||||
code = parseJsontoTagData(sToken.z, pTagVals, &pTag, &pCxt->msg);
|
||||
|
@ -1530,10 +1530,13 @@ typedef struct SInsertParseSyntaxCxt {
|
|||
} SInsertParseSyntaxCxt;
|
||||
|
||||
static int32_t skipParentheses(SInsertParseSyntaxCxt* pCxt) {
|
||||
SToken sToken;
|
||||
SToken sToken;
|
||||
int32_t expectRightParenthesis = 1;
|
||||
while (1) {
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
if (TK_NK_RP == sToken.type) {
|
||||
if (TK_NK_LP == sToken.type) {
|
||||
++expectRightParenthesis;
|
||||
} else if (TK_NK_RP == sToken.type && 0 == --expectRightParenthesis) {
|
||||
break;
|
||||
}
|
||||
if (0 == sToken.n) {
|
||||
|
|
|
@ -103,15 +103,14 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
|
|||
|
||||
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParCxt->async) {
|
||||
code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
|
||||
} else {
|
||||
code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pParCxt->async) {
|
||||
code = getTableMetaFromCache(pCxt->pMetaCache, pName, pMeta);
|
||||
} else {
|
||||
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
|
||||
}
|
||||
}
|
||||
|
@ -150,12 +149,11 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr
|
|||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
char fullDbName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pName, fullDbName);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParCxt->async) {
|
||||
code = getDbVgInfoFromCache(pCxt->pMetaCache, fullDbName, pVgInfo);
|
||||
} else {
|
||||
code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pParCxt->async) {
|
||||
code = getDbVgInfoFromCache(pCxt->pMetaCache, fullDbName, pVgInfo);
|
||||
} else {
|
||||
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
|
||||
}
|
||||
}
|
||||
|
@ -175,15 +173,14 @@ static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray*
|
|||
|
||||
static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParCxt->async) {
|
||||
code = getTableVgroupFromCache(pCxt->pMetaCache, pName, pInfo);
|
||||
} else {
|
||||
code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pParCxt->async) {
|
||||
code = getTableVgroupFromCache(pCxt->pMetaCache, pName, pInfo);
|
||||
} else {
|
||||
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
|
||||
}
|
||||
}
|
||||
|
@ -203,12 +200,11 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName,
|
|||
static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
|
||||
int32_t* pTableNum) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParCxt->async) {
|
||||
code = getDbVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum);
|
||||
} else {
|
||||
code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pParCxt->async) {
|
||||
code = getDbVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum);
|
||||
} else {
|
||||
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
|
||||
}
|
||||
}
|
||||
|
@ -224,12 +220,11 @@ static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo
|
|||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
|
||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pParCxt->async) {
|
||||
code = getDbCfgFromCache(pCxt->pMetaCache, dbFname, pInfo);
|
||||
} else {
|
||||
code = collectUseDatabaseImpl(dbFname, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int32_t code = collectUseDatabaseImpl(dbFname, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pParCxt->async) {
|
||||
code = getDbCfgFromCache(pCxt->pMetaCache, dbFname, pInfo);
|
||||
} else {
|
||||
code = catalogGetDBCfg(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, dbFname, pInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -484,20 +484,6 @@ static int32_t buildDbReq(SHashObj* pDbsHash, SArray** pDbs) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildTableMetaReq(SHashObj* pTableMetaHash, SArray** pTableMeta) {
|
||||
return buildTableReq(pTableMetaHash, pTableMeta);
|
||||
}
|
||||
|
||||
static int32_t buildDbVgroupReq(SHashObj* pDbVgroupHash, SArray** pDbVgroup) {
|
||||
return buildDbReq(pDbVgroupHash, pDbVgroup);
|
||||
}
|
||||
|
||||
static int32_t buildTableVgroupReq(SHashObj* pTableVgroupHash, SArray** pTableVgroup) {
|
||||
return buildTableReq(pTableVgroupHash, pTableVgroup);
|
||||
}
|
||||
|
||||
static int32_t buildDbCfgReq(SHashObj* pDbCfgHash, SArray** pDbCfg) { return buildDbReq(pDbCfgHash, pDbCfg); }
|
||||
|
||||
static int32_t buildUserAuthReq(SHashObj* pUserAuthHash, SArray** pUserAuth) {
|
||||
if (NULL != pUserAuthHash) {
|
||||
*pUserAuth = taosArrayInit(taosHashGetSize(pUserAuthHash), sizeof(SUserAuthInfo));
|
||||
|
@ -537,15 +523,18 @@ static int32_t buildUdfReq(SHashObj* pUdfHash, SArray** pUdf) {
|
|||
}
|
||||
|
||||
int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalogReq) {
|
||||
int32_t code = buildTableMetaReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
||||
int32_t code = buildTableReq(pMetaCache->pTableMeta, &pCatalogReq->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildDbVgroupReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
|
||||
code = buildDbReq(pMetaCache->pDbVgroup, &pCatalogReq->pDbVgroup);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildTableVgroupReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
|
||||
code = buildTableReq(pMetaCache->pTableVgroup, &pCatalogReq->pTableHash);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildDbCfgReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
|
||||
code = buildDbReq(pMetaCache->pDbCfg, &pCatalogReq->pDbCfg);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildDbReq(pMetaCache->pDbInfo, &pCatalogReq->pDbInfo);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildUserAuthReq(pMetaCache->pUserAuth, &pCatalogReq->pUser);
|
||||
|
@ -556,51 +545,39 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t putTableMetaToCache(const SArray* pTableMetaReq, const SArray* pTableMetaData, SHashObj* pTableMeta) {
|
||||
int32_t ntables = taosArrayGetSize(pTableMetaReq);
|
||||
static int32_t putMetaDataToHash(const char* pKey, int32_t len, const SArray* pData, int32_t index, SHashObj* pHash) {
|
||||
SMetaRes* pRes = taosArrayGet(pData, index);
|
||||
return taosHashPut(pHash, pKey, len, &pRes, POINTER_BYTES);
|
||||
}
|
||||
|
||||
static int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput) {
|
||||
SMetaRes** pRes = taosHashGet(pHash, pKey, len);
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == (*pRes)->code) {
|
||||
*pOutput = (*pRes)->pRes;
|
||||
}
|
||||
return (*pRes)->code;
|
||||
}
|
||||
|
||||
static int32_t putTableDataToCache(const SArray* pTableReq, const SArray* pTableData, SHashObj* pTable) {
|
||||
int32_t ntables = taosArrayGetSize(pTableReq);
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(taosArrayGet(pTableMetaReq, i), fullName);
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
taosHashPut(pTableMeta, fullName, strlen(fullName), taosArrayGet(pTableMetaData, i), POINTER_BYTES)) {
|
||||
tNameExtractFullName(taosArrayGet(pTableReq, i), fullName);
|
||||
if (TSDB_CODE_SUCCESS != putMetaDataToHash(fullName, strlen(fullName), pTableData, i, pTable)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t putDbVgroupToCache(const SArray* pDbVgroupReq, const SArray* pDbVgroupData, SHashObj* pDbVgroup) {
|
||||
int32_t nvgs = taosArrayGetSize(pDbVgroupReq);
|
||||
static int32_t putDbDataToCache(const SArray* pDbReq, const SArray* pDbData, SHashObj* pDb) {
|
||||
int32_t nvgs = taosArrayGetSize(pDbReq);
|
||||
for (int32_t i = 0; i < nvgs; ++i) {
|
||||
char* pDbFName = taosArrayGet(pDbVgroupReq, i);
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
taosHashPut(pDbVgroup, pDbFName, strlen(pDbFName), taosArrayGet(pDbVgroupData, i), POINTER_BYTES)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t putTableVgroupToCache(const SArray* pTableVgroupReq, const SArray* pTableVgroupData,
|
||||
SHashObj* pTableVgroup) {
|
||||
int32_t ntables = taosArrayGetSize(pTableVgroupReq);
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(taosArrayGet(pTableVgroupReq, i), fullName);
|
||||
SVgroupInfo* pInfo = taosArrayGet(pTableVgroupData, i);
|
||||
if (TSDB_CODE_SUCCESS != taosHashPut(pTableVgroup, fullName, strlen(fullName), &pInfo, POINTER_BYTES)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t putDbCfgToCache(const SArray* pDbCfgReq, const SArray* pDbCfgData, SHashObj* pDbCfg) {
|
||||
int32_t nvgs = taosArrayGetSize(pDbCfgReq);
|
||||
for (int32_t i = 0; i < nvgs; ++i) {
|
||||
char* pDbFName = taosArrayGet(pDbCfgReq, i);
|
||||
SDbCfgInfo* pInfo = taosArrayGet(pDbCfgData, i);
|
||||
if (TSDB_CODE_SUCCESS != taosHashPut(pDbCfg, pDbFName, strlen(pDbFName), &pInfo, POINTER_BYTES)) {
|
||||
char* pDbFName = taosArrayGet(pDbReq, i);
|
||||
if (TSDB_CODE_SUCCESS != putMetaDataToHash(pDbFName, strlen(pDbFName), pDbData, i, pDb)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -613,7 +590,7 @@ static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUse
|
|||
SUserAuthInfo* pUser = taosArrayGet(pUserAuthReq, i);
|
||||
char key[USER_AUTH_KEY_MAX_LEN] = {0};
|
||||
int32_t len = userAuthToStringExt(pUser->user, pUser->dbFName, pUser->type, key);
|
||||
if (TSDB_CODE_SUCCESS != taosHashPut(pUserAuth, key, len, taosArrayGet(pUserAuthData, i), sizeof(bool))) {
|
||||
if (TSDB_CODE_SUCCESS != putMetaDataToHash(key, len, pUserAuthData, i, pUserAuth)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -623,9 +600,8 @@ static int32_t putUserAuthToCache(const SArray* pUserAuthReq, const SArray* pUse
|
|||
static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHashObj* pUdf) {
|
||||
int32_t num = taosArrayGetSize(pUdfReq);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char* pFunc = taosArrayGet(pUdfReq, i);
|
||||
SFuncInfo* pInfo = taosArrayGet(pUdfData, i);
|
||||
if (TSDB_CODE_SUCCESS != taosHashPut(pUdf, pFunc, strlen(pFunc), &pInfo, POINTER_BYTES)) {
|
||||
char* pFunc = taosArrayGet(pUdfReq, i);
|
||||
if (TSDB_CODE_SUCCESS != putMetaDataToHash(pFunc, strlen(pFunc), pUdfData, i, pUdf)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
@ -633,15 +609,18 @@ static int32_t putUdfToCache(const SArray* pUdfReq, const SArray* pUdfData, SHas
|
|||
}
|
||||
|
||||
int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMetaData, SParseMetaCache* pMetaCache) {
|
||||
int32_t code = putTableMetaToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, pMetaCache->pTableMeta);
|
||||
int32_t code = putTableDataToCache(pCatalogReq->pTableMeta, pMetaData->pTableMeta, pMetaCache->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putDbVgroupToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, pMetaCache->pDbVgroup);
|
||||
code = putDbDataToCache(pCatalogReq->pDbVgroup, pMetaData->pDbVgroup, pMetaCache->pDbVgroup);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putTableVgroupToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, pMetaCache->pTableVgroup);
|
||||
code = putTableDataToCache(pCatalogReq->pTableHash, pMetaData->pTableHash, pMetaCache->pTableVgroup);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putDbCfgToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, pMetaCache->pDbCfg);
|
||||
code = putDbDataToCache(pCatalogReq->pDbCfg, pMetaData->pDbCfg, pMetaCache->pDbCfg);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putDbDataToCache(pCatalogReq->pDbInfo, pMetaData->pDbInfo, pMetaCache->pDbInfo);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = putUserAuthToCache(pCatalogReq->pUser, pMetaData->pUser, pMetaCache->pUserAuth);
|
||||
|
@ -681,16 +660,15 @@ int32_t reserveTableMetaInCacheExt(const SName* pName, SParseMetaCache* pMetaCac
|
|||
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pName, fullName);
|
||||
STableMeta** pRes = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
parserError("getTableMetaFromCache error: %s", fullName);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
STableMeta* pTableMeta = NULL;
|
||||
int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableMeta, (void**)&pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pMeta = tableMetaDup(pTableMeta);
|
||||
if (NULL == *pMeta) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
*pMeta = tableMetaDup(*pRes);
|
||||
if (NULL == *pMeta) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t reserveDbReqInCache(int32_t acctId, const char* pDb, SHashObj** pDbs) {
|
||||
|
@ -710,19 +688,16 @@ int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache*
|
|||
}
|
||||
|
||||
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo) {
|
||||
SArray** pRes = taosHashGet(pMetaCache->pDbVgroup, pDbFName, strlen(pDbFName));
|
||||
if (NULL == pRes) {
|
||||
parserError("getDbVgInfoFromCache error: %s", pDbFName);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
// *pRes is null, which is a legal value, indicating that the user DB has not been created
|
||||
if (NULL != *pRes) {
|
||||
*pVgInfo = taosArrayDup(*pRes);
|
||||
SArray* pVgList = NULL;
|
||||
int32_t code = getMetaDataFromHash(pDbFName, strlen(pDbFName), pMetaCache->pDbVgroup, (void**)&pVgList);
|
||||
// pVgList is null, which is a legal value, indicating that the user DB has not been created
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pVgList) {
|
||||
*pVgInfo = taosArrayDup(pVgList);
|
||||
if (NULL == *pVgInfo) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t reserveTableVgroupInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
|
||||
|
@ -738,30 +713,28 @@ int32_t reserveTableVgroupInCacheExt(const SName* pName, SParseMetaCache* pMetaC
|
|||
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pName, fullName);
|
||||
SVgroupInfo** pRes = taosHashGet(pMetaCache->pTableVgroup, fullName, strlen(fullName));
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
parserError("getTableVgroupFromCache error: %s", fullName);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
SVgroupInfo* pVg = NULL;
|
||||
int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableVgroup, (void**)&pVg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
memcpy(pVgroup, pVg, sizeof(SVgroupInfo));
|
||||
}
|
||||
memcpy(pVgroup, *pRes, sizeof(SVgroupInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
|
||||
return reserveDbReqInCache(acctId, pDb, &pMetaCache->pDbCfg);
|
||||
return reserveDbReqInCache(acctId, pDb, &pMetaCache->pDbInfo);
|
||||
}
|
||||
|
||||
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
|
||||
int32_t* pTableNum) {
|
||||
SDbInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
parserError("getDbVgVersionFromCache error: %s", pDbFName);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
SDbInfo* pDbInfo = NULL;
|
||||
int32_t code = getMetaDataFromHash(pDbFName, strlen(pDbFName), pMetaCache->pDbInfo, (void**)&pDbInfo);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pVersion = pDbInfo->vgVer;
|
||||
*pDbId = pDbInfo->dbId;
|
||||
*pTableNum = pDbInfo->tbNum;
|
||||
}
|
||||
*pVersion = (*pRes)->vgVer;
|
||||
*pDbId = (*pRes)->dbId;
|
||||
*pTableNum = (*pRes)->tbNum;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pMetaCache) {
|
||||
|
@ -769,13 +742,12 @@ int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pM
|
|||
}
|
||||
|
||||
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo) {
|
||||
SDbCfgInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
parserError("getDbCfgFromCache error: %s", pDbFName);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
SDbCfgInfo* pDbCfg = NULL;
|
||||
int32_t code = getMetaDataFromHash(pDbFName, strlen(pDbFName), pMetaCache->pDbCfg, (void**)&pDbCfg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
memcpy(pInfo, pDbCfg, sizeof(SDbCfgInfo));
|
||||
}
|
||||
memcpy(pInfo, *pRes, sizeof(SDbCfgInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t reserveUserAuthInCacheImpl(const char* pKey, int32_t len, SParseMetaCache* pMetaCache) {
|
||||
|
@ -808,13 +780,12 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
|
|||
bool* pPass) {
|
||||
char key[USER_AUTH_KEY_MAX_LEN] = {0};
|
||||
int32_t len = userAuthToStringExt(pUser, pDbFName, type, key);
|
||||
bool* pRes = taosHashGet(pMetaCache->pUserAuth, key, len);
|
||||
if (NULL == pRes) {
|
||||
parserError("getUserAuthFromCache error: %s, %s, %d", pUser, pDbFName, type);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
bool* pRes = NULL;
|
||||
int32_t code = getMetaDataFromHash(key, len, pMetaCache->pUserAuth, (void**)&pRes);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pPass = *pRes;
|
||||
}
|
||||
*pPass = *pRes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
|
||||
|
@ -828,11 +799,10 @@ int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
|
|||
}
|
||||
|
||||
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) {
|
||||
SFuncInfo** pRes = taosHashGet(pMetaCache->pUdf, pFunc, strlen(pFunc));
|
||||
if (NULL == pRes || NULL == *pRes) {
|
||||
parserError("getUdfInfoFromCache error: %s", pFunc);
|
||||
return TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
SFuncInfo* pFuncInfo = NULL;
|
||||
int32_t code = getMetaDataFromHash(pFunc, strlen(pFunc), pMetaCache->pUdf, (void**)&pFuncInfo);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
memcpy(pInfo, pFuncInfo, sizeof(SFuncInfo));
|
||||
}
|
||||
memcpy(pInfo, *pRes, sizeof(SFuncInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -367,49 +367,40 @@ class MockCatalogServiceImpl {
|
|||
}
|
||||
|
||||
int32_t getAllTableMeta(SArray* pTableMetaReq, SArray** pTableMetaData) const {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pTableMetaReq) {
|
||||
int32_t ntables = taosArrayGetSize(pTableMetaReq);
|
||||
*pTableMetaData = taosArrayInit(ntables, POINTER_BYTES);
|
||||
*pTableMetaData = taosArrayInit(ntables, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
STableMeta* pMeta = NULL;
|
||||
code = catalogGetTableMeta((const SName*)taosArrayGet(pTableMetaReq, i), &pMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArrayPush(*pTableMetaData, &pMeta);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
SMetaRes res = {0};
|
||||
res.code = catalogGetTableMeta((const SName*)taosArrayGet(pTableMetaReq, i), (STableMeta**)&res.pRes);
|
||||
taosArrayPush(*pTableMetaData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getAllTableVgroup(SArray* pTableVgroupReq, SArray** pTableVgroupData) const {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pTableVgroupReq) {
|
||||
int32_t ntables = taosArrayGetSize(pTableVgroupReq);
|
||||
*pTableVgroupData = taosArrayInit(ntables, sizeof(SVgroupInfo));
|
||||
*pTableVgroupData = taosArrayInit(ntables, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < ntables; ++i) {
|
||||
SVgroupInfo vgInfo = {0};
|
||||
code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), &vgInfo);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArrayPush(*pTableVgroupData, &vgInfo);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
SMetaRes res = {0};
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||
res.code = catalogGetTableHashVgroup((const SName*)taosArrayGet(pTableVgroupReq, i), (SVgroupInfo*)res.pRes);
|
||||
taosArrayPush(*pTableVgroupData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getAllDbVgroup(SArray* pDbVgroupReq, SArray** pDbVgroupData) const {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pDbVgroupReq) {
|
||||
int32_t ndbs = taosArrayGetSize(pDbVgroupReq);
|
||||
*pDbVgroupData = taosArrayInit(ndbs, POINTER_BYTES);
|
||||
*pDbVgroupData = taosArrayInit(ndbs, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < ndbs; ++i) {
|
||||
int64_t zeroVg = 0;
|
||||
taosArrayPush(*pDbVgroupData, &zeroVg);
|
||||
SMetaRes res = {0};
|
||||
taosArrayPush(*pDbVgroupData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -419,10 +410,11 @@ class MockCatalogServiceImpl {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pDbCfgReq) {
|
||||
int32_t ndbs = taosArrayGetSize(pDbCfgReq);
|
||||
*pDbCfgData = taosArrayInit(ndbs, sizeof(SDbCfgInfo));
|
||||
*pDbCfgData = taosArrayInit(ndbs, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < ndbs; ++i) {
|
||||
SDbCfgInfo dbCfg = {0};
|
||||
taosArrayPush(*pDbCfgData, &dbCfg);
|
||||
SMetaRes res = {0};
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
|
||||
taosArrayPush(*pDbCfgData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -432,10 +424,11 @@ class MockCatalogServiceImpl {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pDbInfoReq) {
|
||||
int32_t ndbs = taosArrayGetSize(pDbInfoReq);
|
||||
*pDbInfoData = taosArrayInit(ndbs, sizeof(SDbCfgInfo));
|
||||
*pDbInfoData = taosArrayInit(ndbs, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < ndbs; ++i) {
|
||||
SDbInfo dbInfo = {0};
|
||||
taosArrayPush(*pDbInfoData, &dbInfo);
|
||||
SMetaRes res = {0};
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(SDbInfo));
|
||||
taosArrayPush(*pDbInfoData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -445,31 +438,29 @@ class MockCatalogServiceImpl {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pUserAuthReq) {
|
||||
int32_t num = taosArrayGetSize(pUserAuthReq);
|
||||
*pUserAuthData = taosArrayInit(num, sizeof(bool));
|
||||
*pUserAuthData = taosArrayInit(num, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
bool pass = true;
|
||||
taosArrayPush(*pUserAuthData, &pass);
|
||||
SMetaRes res = {0};
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(bool));
|
||||
*(bool*)(res.pRes) = true;
|
||||
taosArrayPush(*pUserAuthData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t getAllUdf(SArray* pUdfReq, SArray** pUdfData) const {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (NULL != pUdfReq) {
|
||||
int32_t num = taosArrayGetSize(pUdfReq);
|
||||
*pUdfData = taosArrayInit(num, sizeof(SFuncInfo));
|
||||
*pUdfData = taosArrayInit(num, sizeof(SMetaRes));
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SFuncInfo info = {0};
|
||||
code = catalogGetUdfInfo((char*)taosArrayGet(pUdfReq, i), &info);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
taosArrayPush(*pUdfData, &info);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
SMetaRes res = {0};
|
||||
res.pRes = taosMemoryCalloc(1, sizeof(SFuncInfo));
|
||||
res.code = catalogGetUdfInfo((char*)taosArrayGet(pUdfReq, i), (SFuncInfo*)res.pRes);
|
||||
taosArrayPush(*pUdfData, &res);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uint64_t id_;
|
||||
|
|
|
@ -39,6 +39,8 @@ TEST_F(ParserInitialATest, alterDatabase) {
|
|||
useDb("root", "test");
|
||||
|
||||
run("ALTER DATABASE wxy_db CACHELAST 1 FSYNC 200 WAL 1");
|
||||
|
||||
run("ALTER DATABASE wxy_db KEEP 2400");
|
||||
}
|
||||
|
||||
// todo ALTER local
|
||||
|
|
|
@ -312,10 +312,6 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
|
|||
pTableMeta->sversion = msg->sversion;
|
||||
pTableMeta->tversion = msg->tversion;
|
||||
|
||||
if (isStb) {
|
||||
qDebug("stable %s meta returned, suid:%" PRIx64, msg->stbName, pTableMeta->suid);
|
||||
}
|
||||
|
||||
pTableMeta->tableInfo.numOfTags = msg->numOfTags;
|
||||
pTableMeta->tableInfo.precision = msg->precision;
|
||||
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
|
||||
|
@ -326,6 +322,12 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
|
|||
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
|
||||
}
|
||||
|
||||
qDebug("table %s uid %" PRIx64 " meta returned, type %d vgId %d db %s stb %s suid %" PRIx64 " sver %d tver %d" PRIx64
|
||||
" tagNum %d colNum %d precision %d rowSize %d",
|
||||
msg->tbName, pTableMeta->uid, pTableMeta->tableType, pTableMeta->vgId, msg->dbFName, msg->stbName, pTableMeta->suid,
|
||||
pTableMeta->sversion, pTableMeta->tversion, pTableMeta->tableInfo.numOfTags, pTableMeta->tableInfo.numOfColumns,
|
||||
pTableMeta->tableInfo.precision, pTableMeta->tableInfo.rowSize);
|
||||
|
||||
*pMeta = pTableMeta;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -545,7 +545,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
|
||||
SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
|
||||
|
||||
++addNum;
|
||||
}
|
||||
|
@ -897,6 +897,7 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
|
|||
if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) {
|
||||
schNotifyUserQueryRes(pJob);
|
||||
} else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) {
|
||||
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
|
||||
schNotifyUserFetchRes(pJob);
|
||||
}
|
||||
}
|
||||
|
@ -925,6 +926,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
|
|||
} else if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) {
|
||||
schNotifyUserQueryRes(pJob);
|
||||
} else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) {
|
||||
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
|
||||
schNotifyUserFetchRes(pJob);
|
||||
}
|
||||
|
||||
|
@ -945,6 +947,8 @@ void schProcessOnDataFetched(SSchJob *job) {
|
|||
if (job->attr.syncSchedule) {
|
||||
tsem_post(&job->rspSem);
|
||||
} else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&job->userCb, SCH_FETCH_CB, 0)) {
|
||||
atomic_val_compare_exchange_8(&job->userFetch, 1, 0);
|
||||
|
||||
schNotifyUserFetchRes(job);
|
||||
}
|
||||
}
|
||||
|
@ -1680,9 +1684,9 @@ int32_t schAsyncFetchRows(SSchJob *pJob) {
|
|||
}
|
||||
|
||||
if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) {
|
||||
SCH_ERR_JRET(schNotifyUserFetchRes(pJob));
|
||||
|
||||
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
|
||||
|
||||
SCH_ERR_JRET(schNotifyUserFetchRes(pJob));
|
||||
} else {
|
||||
pJob->userCb = SCH_FETCH_CB;
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ endi
|
|||
sql select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
|
||||
print ====> sql : select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
|
||||
print ====> rows: $rows
|
||||
if $rows != 7 then
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -214,9 +214,9 @@ print =================== count all rows
|
|||
sql select count(c1) from stb1
|
||||
print ====> sql : select count(c1) from stb1
|
||||
print ====> rows: $data00
|
||||
if $data00 != 20 then
|
||||
print expect 20, actual: $data00
|
||||
return -1
|
||||
if $data00 != 17 then
|
||||
print expect 17, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
#=================================================
|
||||
|
@ -246,7 +246,7 @@ print =================== count all rows
|
|||
sql select count(c1) from stb1
|
||||
print ====> sql : select count(c1) from stb1
|
||||
print ====> rows: $data00
|
||||
if $data00 != 20 then
|
||||
if $data00 != 17 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -279,7 +279,7 @@ endi
|
|||
sql select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
|
||||
print ====> sql : select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
|
||||
print ====> rows: $rows
|
||||
if $rows != 7 then
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ sql connect
|
|||
print =============== create database
|
||||
sql create database db
|
||||
sql show databases
|
||||
if $rows != 2 then
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -96,8 +96,6 @@ sql insert into ct4 values ( '2022-05-21 01:01:01.000', NULL, NULL, NULL, NULL,
|
|||
|
||||
print ================ start query ======================
|
||||
print ================ SQL used to cause taosd or taos shell crash
|
||||
sql select sum(c1) ,count(c1) from ct4 group by c1 having sum(c10) between 0 and 1 ;
|
||||
|
||||
|
||||
sql_error select sum(c1) ,count(c1) from ct4 group by c1 having sum(c10) between 0 and 1 ;
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue