Merge pull request #9581 from taosdata/feature/3.0_liaohj
[td-11818] refactor.
This commit is contained in:
commit
542e061d27
|
@ -100,13 +100,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
|
|||
/**
|
||||
* Force renew a table's local cached meta data and get the new one.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pTransporter (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @param pTableMeta(output, table meta data, NEED to free it by calller)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -54,7 +54,7 @@ bool taosGetSysMemory(float *memoryUsedMB);
|
|||
void taosPrintOsInfo();
|
||||
int taosSystem(const char *cmd);
|
||||
void taosKillSystem();
|
||||
int32_t taosGetSystemUid(char *uid, int32_t uidlen);
|
||||
int32_t taosGetSystemUUID(char *uid, int32_t uidlen);
|
||||
char * taosGetCmdlineByPID(int pid);
|
||||
void taosSetCoreDump(bool enable);
|
||||
|
||||
|
|
|
@ -419,17 +419,20 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
*+------------+-----+-----------+---------------+
|
||||
* @return
|
||||
*/
|
||||
static int32_t requestSerialId = 0;
|
||||
uint64_t generateRequestId() {
|
||||
uint64_t hashId = 0;
|
||||
static uint64_t hashId = 0;
|
||||
static int32_t requestSerialId = 0;
|
||||
|
||||
char uid[64] = {0};
|
||||
int32_t code = taosGetSystemUid(uid, tListLen(uid));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
if (hashId == 0) {
|
||||
char uid[64] = {0};
|
||||
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
|
||||
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
|
||||
} else {
|
||||
hashId = MurmurHash3_32(uid, strlen(uid));
|
||||
} else {
|
||||
hashId = MurmurHash3_32(uid, strlen(uid));
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ts = taosGetTimestampUs();
|
||||
|
|
|
@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
|
|||
(*pRequest)->sqlstr[sqlLen] = 0;
|
||||
(*pRequest)->sqlLen = sqlLen;
|
||||
|
||||
tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr);
|
||||
tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -443,10 +443,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
*/
|
||||
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
|
||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->requestId,
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||
} else {
|
||||
tscError("reqId:0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x"PRIx64, pRequest->requestId,
|
||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||
}
|
||||
|
||||
|
|
|
@ -496,6 +496,17 @@ TEST(testCase, create_multiple_tables) {
|
|||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
// for(int32_t i = 0; i < 10000; ++i) {
|
||||
// char sql[512] = {0};
|
||||
// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
|
||||
// TAOS_RES* pres = taos_query(pConn, sql);
|
||||
// if (taos_errno(pres) != 0) {
|
||||
// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
|
||||
// }
|
||||
// taos_free_result(pres);
|
||||
// }
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
|
@ -506,7 +517,6 @@ TEST(testCase, generated_request_id_test) {
|
|||
uint64_t v = generateRequestId();
|
||||
void* result = taosHashGet(phash, &v, sizeof(v));
|
||||
ASSERT_EQ(result, nullptr);
|
||||
|
||||
taosHashPut(phash, &v, sizeof(v), NULL, 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
clusterObj.createdTime = taosGetTimestampMs();
|
||||
clusterObj.updateTime = clusterObj.createdTime;
|
||||
|
||||
int32_t code = taosGetSystemUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
||||
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
||||
if (code != 0) {
|
||||
strcpy(clusterObj.name, "tdengine2.0");
|
||||
mError("failed to get name from system, set to default val %s", clusterObj.name);
|
||||
|
|
|
@ -698,8 +698,8 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta);
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta);
|
||||
}
|
||||
|
||||
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
|
||||
|
|
|
@ -326,6 +326,31 @@ typedef struct SVgroupTablesBatch {
|
|||
SVgroupInfo info;
|
||||
} SVgroupTablesBatch;
|
||||
|
||||
static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder,
|
||||
SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) {
|
||||
const char* msg1 = "illegal value or data overflow";
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < numOfInputTag; ++i) {
|
||||
SSchema* pSchema = &pTagSchema[i];
|
||||
|
||||
char* endPtr = NULL;
|
||||
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
|
||||
|
||||
SKvParam param = {.builder = pKvRowBuilder, .schema = pSchema};
|
||||
|
||||
SToken* pItem = taosArrayGet(pTagValList, i);
|
||||
code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tdDestroyKVRowBuilder(pKvRowBuilder);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
|
||||
const char* msg1 = "invalid table name";
|
||||
const char* msg2 = "tags number not matched";
|
||||
|
@ -354,10 +379,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
|||
}
|
||||
|
||||
SArray* pValList = pCreateTableInfo->pTagVals;
|
||||
size_t numOfInputTag = taosArrayGetSize(pValList);
|
||||
|
||||
size_t numOfInputTag = taosArrayGetSize(pValList);
|
||||
STableMeta* pSuperTableMeta = NULL;
|
||||
|
||||
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -463,21 +487,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
|||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfInputTag; ++i) {
|
||||
SSchema* pSchema = &pTagSchema[i];
|
||||
|
||||
char* endPtr = NULL;
|
||||
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
|
||||
|
||||
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
|
||||
|
||||
SToken* pItem = taosArrayGet(pValList, i);
|
||||
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
}
|
||||
code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -499,8 +511,8 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
|||
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
|
||||
|
||||
struct SVCreateTbReq req = {0};
|
||||
req.type = TD_CHILD_TABLE;
|
||||
req.name = strdup(tNameGetTableName(&tableName));
|
||||
req.type = TD_CHILD_TABLE;
|
||||
req.name = strdup(tNameGetTableName(&tableName));
|
||||
req.ctbCfg.suid = pSuperTableMeta->uid;
|
||||
req.ctbCfg.pTag = row;
|
||||
|
||||
|
|
|
@ -372,7 +372,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
|
|||
|
||||
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
|
||||
if (!moved) {
|
||||
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
|
||||
SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -480,7 +480,6 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
|
|||
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE_RSP: {
|
||||
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||
|
@ -492,6 +491,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
|
|||
goto _task_error;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_SUBMIT_RSP: {
|
||||
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -252,7 +252,7 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
|
|||
|
||||
void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); }
|
||||
|
||||
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
|
||||
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
|
||||
GUID guid;
|
||||
CoCreateGuid(&guid);
|
||||
|
||||
|
@ -452,7 +452,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
|
||||
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
|
||||
uuid_t uuid = {0};
|
||||
uuid_generate(uuid);
|
||||
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
|
||||
|
@ -1070,7 +1070,7 @@ void taosSetCoreDump(bool enable) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
|
||||
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
|
||||
int fd;
|
||||
int len = 0;
|
||||
|
||||
|
|
Loading…
Reference in New Issue